Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 : /*
10 : * (c) Martin Kersten, Lefteris Sidirourgos
11 : * Implement a parallel sort-merge MAL program generator
12 : */
13 : #include "monetdb_config.h"
14 : #include "orderidx.h"
15 : #include "gdk.h"
16 : #include "mal_exception.h"
17 : #include "mal_function.h"
18 :
19 : #define MIN_PIECE ((BUN) 1000) /* TODO use realistic size in production */
20 :
21 : str
22 67 : OIDXdropImplementation(Client cntxt, BAT *b)
23 : {
24 : (void) cntxt;
25 67 : OIDXdestroy(b);
26 67 : return MAL_SUCCEED;
27 : }
28 :
29 : str
30 79 : OIDXcreateImplementation(Client cntxt, int tpe, BAT *b, int pieces)
31 : {
32 : int i, loopvar, arg;
33 : BUN cnt, step=0,o;
34 : MalBlkPtr smb;
35 : MalStkPtr newstk;
36 : Symbol snew = NULL;
37 : InstrPtr q, pack;
38 : char name[IDLENGTH];
39 : str msg= MAL_SUCCEED;
40 :
41 79 : if (BATcount(b) <= 1)
42 : return MAL_SUCCEED;
43 :
44 : /* check if b is sorted, then index does not make sense */
45 73 : if (b->tsorted || b->trevsorted)
46 : return MAL_SUCCEED;
47 :
48 : /* check if b already has index */
49 73 : if (BATcheckorderidx(b))
50 : return MAL_SUCCEED;
51 :
52 130 : switch (ATOMbasetype(b->ttype)) {
53 : case TYPE_void:
54 : /* trivially supported */
55 : return MAL_SUCCEED;
56 52 : case TYPE_bte:
57 : case TYPE_sht:
58 : case TYPE_int:
59 : case TYPE_lng:
60 : #ifdef HAVE_HGE
61 : case TYPE_hge:
62 : #endif
63 : case TYPE_flt:
64 : case TYPE_dbl:
65 52 : if (GDKnr_threads > 1 && BATcount(b) >= 2 * MIN_PIECE && (GDKdebug & FORCEMITOMASK) == 0)
66 : break;
67 : /* fall through */
68 : default:
69 72 : if (BATorderidx(b, true) != GDK_SUCCEED)
70 0 : throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
71 : return MAL_SUCCEED;
72 : }
73 :
74 0 : if( pieces <= 0 ){
75 : if (GDKnr_threads <= 1) {
76 : pieces = 1;
77 : } else if (GDKdebug & FORCEMITOMASK) {
78 : /* we want many pieces, even tiny ones */
79 : if (BATcount(b) < 4)
80 : pieces = 1;
81 : else if (BATcount(b) / 2 < (BUN) GDKnr_threads)
82 : pieces = (int) (BATcount(b) / 2);
83 : else
84 : pieces = GDKnr_threads;
85 : } else {
86 : if (BATcount(b) < 2 * MIN_PIECE)
87 : pieces = 1;
88 0 : else if (BATcount(b) / MIN_PIECE < (BUN) GDKnr_threads)
89 0 : pieces = (int) (BATcount(b) / MIN_PIECE);
90 : else
91 : pieces = GDKnr_threads;
92 : }
93 0 : } else if (BATcount(b) < (BUN) pieces || BATcount(b) < MIN_PIECE) {
94 : pieces = 1;
95 : }
96 :
97 : /* create a temporary MAL function to sort the BAT in parallel */
98 0 : snprintf(name, IDLENGTH, "sort%d", rand()%1000);
99 0 : snew = newFunction(putName("user"), putName(name),
100 : FUNCTIONsymbol);
101 0 : if(snew == NULL) {
102 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
103 0 : goto bailout;
104 : }
105 0 : smb = snew->def;
106 0 : q = getInstrPtr(smb, 0);
107 0 : if ((arg = newTmpVariable(smb, tpe)) < 0) {
108 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
109 0 : goto bailout;
110 : }
111 0 : q= addArgument(smb, q, arg);
112 0 : if (q == NULL || (getArg(q,0) = newTmpVariable(smb, TYPE_void)) < 0) {
113 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
114 0 : goto bailout;
115 : }
116 :
117 0 : if( resizeMalBlk(smb, 2*pieces+10) < 0)
118 0 : goto bailout; // large enough
119 : /* create the pack instruction first, as it will hold
120 : * intermediate variables */
121 0 : pack = newInstruction(0, putName("bat"), putName("orderidx"));
122 0 : if (pack == NULL || (pack->argv[0] = newTmpVariable(smb, TYPE_void)) < 0) {
123 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
124 0 : goto bailout;
125 : }
126 0 : pack = addArgument(smb, pack, arg);
127 0 : if (pack == NULL) {
128 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
129 0 : goto bailout;
130 : }
131 0 : setVarFixed(smb, getArg(pack, 0));
132 :
133 : /* the costly part executed as a parallel block */
134 0 : if ((loopvar = newTmpVariable(smb, TYPE_bit)) < 0) {
135 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
136 0 : goto bailout;
137 : }
138 0 : q = newStmt(smb, putName("language"), putName("dataflow"));
139 0 : if (q == NULL) {
140 0 : freeInstruction(pack);
141 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
142 0 : goto bailout;
143 : }
144 0 : q->barrier = BARRIERsymbol;
145 0 : q->argv[0] = loopvar;
146 :
147 0 : cnt = BATcount(b);
148 0 : step = cnt/pieces;
149 : o = 0;
150 0 : for (i = 0; i < pieces; i++) {
151 : /* add slice instruction */
152 0 : q = newInstruction(smb, putName("algebra"),putName("slice"));
153 0 : if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
154 0 : freeInstruction(q);
155 0 : freeInstruction(pack);
156 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
157 0 : goto bailout;
158 : }
159 0 : setVarType(smb, getArg(q,0), tpe);
160 0 : setVarFixed(smb, getArg(q,0));
161 0 : q = addArgument(smb, q, arg);
162 0 : if (q == NULL) {
163 0 : freeInstruction(pack);
164 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
165 0 : goto bailout;
166 : }
167 0 : pack = addArgument(smb, pack, getArg(q,0));
168 0 : q = pushOid(smb, q, o);
169 0 : if (i == pieces-1) {
170 : o = cnt;
171 : } else {
172 0 : o += step;
173 : }
174 0 : q = pushOid(smb, q, o - 1);
175 0 : if (q == NULL) {
176 0 : freeInstruction(pack);
177 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
178 0 : goto bailout;
179 : }
180 0 : pushInstruction(smb, q);
181 : }
182 0 : for (i = 0; i < pieces; i++) {
183 : /* add sort instruction */
184 0 : q = newInstruction(smb, putName("algebra"), putName("orderidx"));
185 0 : if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
186 0 : freeInstruction(q);
187 0 : freeInstruction(pack);
188 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
189 0 : goto bailout;
190 : }
191 0 : setVarType(smb, getArg(q, 0), tpe);
192 0 : setVarFixed(smb, getArg(q, 0));
193 0 : q = addArgument(smb, q, pack->argv[2+i]);
194 0 : q = pushBit(smb, q, 1);
195 0 : if (q == NULL) {
196 0 : freeInstruction(pack);
197 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
198 0 : goto bailout;
199 : }
200 0 : pack->argv[2+i] = getArg(q, 0);
201 0 : pushInstruction(smb, q);
202 : }
203 : /* finalize OID packing, check, and evaluate */
204 0 : pushInstruction(smb,pack);
205 0 : q = newAssignment(smb);
206 0 : if(q == NULL) {
207 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
208 0 : goto bailout;
209 : }
210 0 : q->barrier = EXITsymbol;
211 0 : q->argv[0] = loopvar;
212 0 : pushEndInstruction(smb);
213 0 : msg = chkProgram(cntxt->usermodule, smb);
214 0 : if( msg )
215 0 : goto bailout;
216 : /* evaluate MAL block and keep the ordered OID bat */
217 0 : newstk = prepareMALstack(smb, smb->vsize);
218 0 : if (newstk == NULL) {
219 0 : msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
220 0 : goto bailout;
221 : }
222 0 : newstk->up = 0;
223 0 : newstk->stk[arg].vtype= TYPE_bat;
224 0 : newstk->stk[arg].val.bval= b->batCacheid;
225 0 : BBPretain(newstk->stk[arg].val.bval);
226 0 : msg = runMALsequence(cntxt, smb, 1, 0, newstk, 0, 0);
227 0 : freeStack(newstk);
228 : /* get rid of temporary MAL block */
229 0 : bailout:
230 0 : freeSymbol(snew);
231 0 : return msg;
232 : }
233 :
234 : static str
235 6 : OIDXcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
236 : {
237 : BAT *b;
238 : str msg= MAL_SUCCEED;
239 : int pieces = -1;
240 :
241 6 : if (pci->argc == 3) {
242 6 : pieces = stk->stk[pci->argv[2]].val.ival;
243 6 : if (pieces < 0)
244 0 : throw(MAL,"bat.orderidx","Positive number expected");
245 : }
246 :
247 6 : b = BATdescriptor( *getArgReference_bat(stk, pci, 1));
248 6 : if (b == NULL)
249 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
250 6 : msg = OIDXcreateImplementation(cntxt, getArgType(mb,pci,1), b, pieces);
251 6 : BBPunfix(b->batCacheid);
252 6 : return msg;
253 : }
254 :
255 : static str
256 0 : OIDXhasorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
257 : {
258 : BAT *b;
259 0 : bit *ret = getArgReference_bit(stk,pci,0);
260 0 : bat bid = *getArgReference_bat(stk, pci, 1);
261 :
262 : (void) cntxt;
263 : (void) mb;
264 :
265 0 : b = BATdescriptor(bid);
266 0 : if (b == NULL)
267 0 : throw(MAL, "bat.hasorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
268 :
269 0 : *ret = b->torderidx != NULL;
270 :
271 0 : BBPunfix(b->batCacheid);
272 0 : return MAL_SUCCEED;
273 : }
274 :
275 : static str
276 6 : OIDXgetorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
277 : {
278 : BAT *b;
279 : BAT *bn;
280 6 : bat *ret = getArgReference_bat(stk,pci,0);
281 6 : bat bid = *getArgReference_bat(stk, pci, 1);
282 :
283 : (void) cntxt;
284 : (void) mb;
285 :
286 6 : b = BATdescriptor(bid);
287 6 : if (b == NULL)
288 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
289 :
290 6 : if (!BATcheckorderidx(b)) {
291 0 : BBPunfix(b->batCacheid);
292 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
293 : }
294 :
295 6 : if ((bn = COLnew(0, TYPE_oid, BATcount(b), TRANSIENT)) == NULL) {
296 0 : BBPunfix(b->batCacheid);
297 0 : throw(MAL, "bat.getorderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
298 : }
299 6 : memcpy(Tloc(bn, 0), (const oid *) b->torderidx->base + ORDERIDXOFF,
300 6 : BATcount(b) * SIZEOF_OID);
301 6 : BATsetcount(bn, BATcount(b));
302 6 : bn->tkey = true;
303 6 : bn->tsorted = bn->trevsorted = BATcount(b) <= 1;
304 6 : bn->tnil = false;
305 6 : bn->tnonil = true;
306 6 : *ret = bn->batCacheid;
307 6 : BBPkeepref(*ret);
308 6 : BBPunfix(b->batCacheid);
309 6 : return MAL_SUCCEED;
310 : }
311 :
312 : static str
313 0 : OIDXorderidx(bat *ret, const bat *bid, const bit *stable)
314 : {
315 : BAT *b;
316 : gdk_return r;
317 :
318 : (void) ret;
319 0 : b = BATdescriptor(*bid);
320 0 : if (b == NULL)
321 0 : throw(MAL, "algebra.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
322 :
323 0 : r = BATorderidx(b, *stable);
324 0 : if (r != GDK_SUCCEED) {
325 0 : BBPunfix(*bid);
326 0 : throw(MAL, "algebra.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
327 : }
328 0 : *ret = *bid;
329 0 : BBPkeepref(*ret);
330 0 : return MAL_SUCCEED;
331 : }
332 :
333 : /*
334 : * Merge the collection of sorted OID BATs into one
335 : */
336 :
337 : static str
338 0 : OIDXmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
339 : {
340 : bat bid;
341 : BAT *b;
342 : BAT **a;
343 : int i, j, n_ar;
344 : BUN m_sz;
345 : gdk_return rc;
346 :
347 : (void) cntxt;
348 : (void) mb;
349 :
350 0 : if( pci->retc != 1 )
351 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, retc != 1 ");
352 0 : if( pci->argc < 2 )
353 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, argc != 2");
354 :
355 0 : n_ar = pci->argc - 2;
356 :
357 0 : bid = *getArgReference_bat(stk, pci, 1);
358 0 : b = BATdescriptor(bid);
359 0 : if (b == NULL)
360 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
361 :
362 0 : if (b->torderidx) {
363 0 : BBPunfix(bid);
364 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, torderidx already set");
365 : }
366 :
367 0 : switch (ATOMbasetype(b->ttype)) {
368 : case TYPE_bte:
369 : case TYPE_sht:
370 : case TYPE_int:
371 : case TYPE_lng:
372 : #ifdef HAVE_HGE
373 : case TYPE_hge:
374 : #endif
375 : case TYPE_flt:
376 : case TYPE_dbl:
377 : break;
378 0 : case TYPE_str:
379 : /* TODO: support strings etc. */
380 : case TYPE_void:
381 : case TYPE_ptr:
382 : default:
383 0 : BBPunfix(bid);
384 0 : throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
385 : }
386 :
387 0 : if ((a = (BAT **) GDKmalloc(n_ar*sizeof(BAT *))) == NULL) {
388 0 : BBPunfix(bid);
389 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
390 : }
391 : m_sz = 0;
392 0 : for (i = 0; i < n_ar; i++) {
393 0 : a[i] = BATdescriptor(*getArgReference_bat(stk, pci, i+2));
394 0 : if (a[i] == NULL) {
395 0 : for (j = i-1; j >= 0; j--) {
396 0 : BBPunfix(a[j]->batCacheid);
397 : }
398 0 : GDKfree(a);
399 0 : BBPunfix(bid);
400 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
401 : }
402 0 : m_sz += BATcount(a[i]);
403 0 : if (BATcount(a[i]) == 0) {
404 0 : BBPunfix(a[i]->batCacheid);
405 0 : a[i] = NULL;
406 : }
407 : }
408 0 : assert(m_sz == BATcount(b));
409 0 : for (i = 0; i < n_ar; i++) {
410 0 : if (a[i] == NULL) {
411 0 : n_ar--;
412 0 : if (i < n_ar)
413 0 : a[i] = a[n_ar];
414 0 : i--;
415 : }
416 : }
417 : if (m_sz != BATcount(b)) {
418 : BBPunfix(bid);
419 : for (i = 0; i < n_ar; i++)
420 : BBPunfix(a[i]->batCacheid);
421 : GDKfree(a);
422 : throw(MAL, "bat.orderidx", "count mismatch");
423 : }
424 :
425 0 : rc = GDKmergeidx(b, a, n_ar);
426 :
427 0 : for (i = 0; i < n_ar; i++)
428 0 : BBPunfix(a[i]->batCacheid);
429 0 : GDKfree(a);
430 0 : BBPunfix(bid);
431 :
432 0 : if (rc != GDK_SUCCEED)
433 0 : throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
434 :
435 : return MAL_SUCCEED;
436 : }
437 :
438 : #include "mel.h"
439 : mel_func orderidx_init_funcs[] = {
440 : pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,2, arg("",void),batargany("bv",1))),
441 : pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,3, arg("",void),batargany("bv",1),arg("pieces",int))),
442 : pattern("bat", "orderidx", OIDXmerge, false, "Consolidates the OID index arrangement", args(1,3, arg("",void),batargany("bv",1),batvarargany("l",1))),
443 : pattern("bat", "hasorderidx", OIDXhasorderidx, false, "Return true if order index exists", args(1,2, arg("",bit),batargany("bv",1))),
444 : pattern("bat", "getorderidx", OIDXgetorderidx, false, "Return the order index if it exists", args(1,2, batarg("",oid),batargany("bv",1))),
445 : command("algebra", "orderidx", OIDXorderidx, false, "Create an order index", args(1,3, batargany("",1),batargany("bv",1),arg("stable",bit))),
446 : { .imp=NULL }
447 : };
448 : #include "mal_import.h"
449 : #ifdef _MSC_VER
450 : #undef read
451 : #pragma section(".CRT$XCU",read)
452 : #endif
453 257 : LIB_STARTUP_FUNC(init_orderidx_mal)
454 257 : { mal_module("orderidx", NULL, orderidx_init_funcs); }
|