LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - orderidx.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 40 220 18.2 %
Date: 2021-10-27 03:06:47 Functions: 5 8 62.5 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * (c) Martin Kersten, Lefteris Sidirourgos
      11             :  * Implement a parallel sort-merge MAL program generator
      12             :  */
      13             : #include "monetdb_config.h"
      14             : #include "orderidx.h"
      15             : #include "gdk.h"
      16             : #include "mal_exception.h"
      17             : #include "mal_function.h"
      18             : 
      19             : #define MIN_PIECE       ((BUN) 1000)    /* TODO use realistic size in production */
      20             : 
      21             : str
      22          67 : OIDXdropImplementation(Client cntxt, BAT *b)
      23             : {
      24             :         (void) cntxt;
      25          67 :         OIDXdestroy(b);
      26          67 :         return MAL_SUCCEED;
      27             : }
      28             : 
      29             : str
      30          79 : OIDXcreateImplementation(Client cntxt, int tpe, BAT *b, int pieces)
      31             : {
      32             :         int i, loopvar, arg;
      33             :         BUN cnt, step=0,o;
      34             :         MalBlkPtr smb;
      35             :         MalStkPtr newstk;
      36             :         Symbol snew = NULL;
      37             :         InstrPtr q, pack;
      38             :         char name[IDLENGTH];
      39             :         str msg= MAL_SUCCEED;
      40             : 
      41          79 :         if (BATcount(b) <= 1)
      42             :                 return MAL_SUCCEED;
      43             : 
      44             :         /* check if b is sorted, then index does not make sense */
      45          73 :         if (b->tsorted || b->trevsorted)
      46             :                 return MAL_SUCCEED;
      47             : 
      48             :         /* check if b already has index */
      49          73 :         if (BATcheckorderidx(b))
      50             :                 return MAL_SUCCEED;
      51             : 
      52         130 :         switch (ATOMbasetype(b->ttype)) {
      53             :         case TYPE_void:
      54             :                 /* trivially supported */
      55             :                 return MAL_SUCCEED;
      56          52 :         case TYPE_bte:
      57             :         case TYPE_sht:
      58             :         case TYPE_int:
      59             :         case TYPE_lng:
      60             : #ifdef HAVE_HGE
      61             :         case TYPE_hge:
      62             : #endif
      63             :         case TYPE_flt:
      64             :         case TYPE_dbl:
      65          52 :                 if (GDKnr_threads > 1 && BATcount(b) >= 2 * MIN_PIECE && (GDKdebug & FORCEMITOMASK) == 0)
      66             :                         break;
      67             :                 /* fall through */
      68             :         default:
      69          72 :                 if (BATorderidx(b, true) != GDK_SUCCEED)
      70           0 :                         throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
      71             :                 return MAL_SUCCEED;
      72             :         }
      73             : 
      74           0 :         if( pieces <= 0 ){
      75             :                 if (GDKnr_threads <= 1) {
      76             :                         pieces = 1;
      77             :                 } else if (GDKdebug & FORCEMITOMASK) {
      78             :                         /* we want many pieces, even tiny ones */
      79             :                         if (BATcount(b) < 4)
      80             :                                 pieces = 1;
      81             :                         else if (BATcount(b) / 2 < (BUN) GDKnr_threads)
      82             :                                 pieces = (int) (BATcount(b) / 2);
      83             :                         else
      84             :                                 pieces = GDKnr_threads;
      85             :                 } else {
      86             :                         if (BATcount(b) < 2 * MIN_PIECE)
      87             :                                 pieces = 1;
      88           0 :                         else if (BATcount(b) / MIN_PIECE < (BUN) GDKnr_threads)
      89           0 :                                 pieces = (int) (BATcount(b) / MIN_PIECE);
      90             :                         else
      91             :                                 pieces = GDKnr_threads;
      92             :                 }
      93           0 :         } else if (BATcount(b) < (BUN) pieces || BATcount(b) < MIN_PIECE) {
      94             :                 pieces = 1;
      95             :         }
      96             : 
      97             :         /* create a temporary MAL function to sort the BAT in parallel */
      98           0 :         snprintf(name, IDLENGTH, "sort%d", rand()%1000);
      99           0 :         snew = newFunction(putName("user"), putName(name),
     100             :                                            FUNCTIONsymbol);
     101           0 :         if(snew == NULL) {
     102           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     103           0 :                 goto bailout;
     104             :         }
     105           0 :         smb = snew->def;
     106           0 :         q = getInstrPtr(smb, 0);
     107           0 :         if ((arg = newTmpVariable(smb, tpe)) < 0) {
     108           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     109           0 :                 goto bailout;
     110             :         }
     111           0 :         q= addArgument(smb, q, arg);
     112           0 :         if (q == NULL || (getArg(q,0) = newTmpVariable(smb, TYPE_void)) < 0) {
     113           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     114           0 :                 goto bailout;
     115             :         }
     116             : 
     117           0 :         if( resizeMalBlk(smb, 2*pieces+10) < 0)
     118           0 :                 goto bailout; // large enough
     119             :         /* create the pack instruction first, as it will hold
     120             :          * intermediate variables */
     121           0 :         pack = newInstruction(0, putName("bat"), putName("orderidx"));
     122           0 :         if (pack == NULL || (pack->argv[0] = newTmpVariable(smb, TYPE_void)) < 0) {
     123           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     124           0 :                 goto bailout;
     125             :         }
     126           0 :         pack = addArgument(smb, pack, arg);
     127           0 :         if (pack == NULL) {
     128           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     129           0 :                 goto bailout;
     130             :         }
     131           0 :         setVarFixed(smb, getArg(pack, 0));
     132             : 
     133             :         /* the costly part executed as a parallel block */
     134           0 :         if ((loopvar = newTmpVariable(smb, TYPE_bit)) < 0) {
     135           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     136           0 :                 goto bailout;
     137             :         }
     138           0 :         q = newStmt(smb, putName("language"), putName("dataflow"));
     139           0 :         if (q == NULL) {
     140           0 :                 freeInstruction(pack);
     141           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     142           0 :                 goto bailout;
     143             :         }
     144           0 :         q->barrier = BARRIERsymbol;
     145           0 :         q->argv[0] = loopvar;
     146             : 
     147           0 :         cnt = BATcount(b);
     148           0 :         step = cnt/pieces;
     149             :         o = 0;
     150           0 :         for (i = 0; i < pieces; i++) {
     151             :                 /* add slice instruction */
     152           0 :                 q = newInstruction(smb, putName("algebra"),putName("slice"));
     153           0 :                 if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
     154           0 :                         freeInstruction(q);
     155           0 :                         freeInstruction(pack);
     156           0 :                         msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     157           0 :                         goto bailout;
     158             :                 }
     159           0 :                 setVarType(smb, getArg(q,0), tpe);
     160           0 :                 setVarFixed(smb, getArg(q,0));
     161           0 :                 q = addArgument(smb, q, arg);
     162           0 :                 if (q == NULL) {
     163           0 :                         freeInstruction(pack);
     164           0 :                         msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     165           0 :                         goto bailout;
     166             :                 }
     167           0 :                 pack = addArgument(smb, pack, getArg(q,0));
     168           0 :                 q = pushOid(smb, q, o);
     169           0 :                 if (i == pieces-1) {
     170             :                         o = cnt;
     171             :                 } else {
     172           0 :                         o += step;
     173             :                 }
     174           0 :                 q = pushOid(smb, q, o - 1);
     175           0 :                 if (q == NULL) {
     176           0 :                         freeInstruction(pack);
     177           0 :                         msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     178           0 :                         goto bailout;
     179             :                 }
     180           0 :                 pushInstruction(smb, q);
     181             :         }
     182           0 :         for (i = 0; i < pieces; i++) {
     183             :                 /* add sort instruction */
     184           0 :                 q = newInstruction(smb, putName("algebra"), putName("orderidx"));
     185           0 :                 if (q == NULL || (setDestVar(q, newTmpVariable(smb, TYPE_any))) < 0) {
     186           0 :                         freeInstruction(q);
     187           0 :                         freeInstruction(pack);
     188           0 :                         msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     189           0 :                         goto bailout;
     190             :                 }
     191           0 :                 setVarType(smb, getArg(q, 0), tpe);
     192           0 :                 setVarFixed(smb, getArg(q, 0));
     193           0 :                 q = addArgument(smb, q, pack->argv[2+i]);
     194           0 :                 q = pushBit(smb, q, 1);
     195           0 :                 if (q == NULL) {
     196           0 :                         freeInstruction(pack);
     197           0 :                         msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     198           0 :                         goto bailout;
     199             :                 }
     200           0 :                 pack->argv[2+i] = getArg(q, 0);
     201           0 :                 pushInstruction(smb, q);
     202             :         }
     203             :         /* finalize OID packing, check, and evaluate */
     204           0 :         pushInstruction(smb,pack);
     205           0 :         q = newAssignment(smb);
     206           0 :         if(q == NULL) {
     207           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     208           0 :                 goto bailout;
     209             :         }
     210           0 :         q->barrier = EXITsymbol;
     211           0 :         q->argv[0] = loopvar;
     212           0 :         pushEndInstruction(smb);
     213           0 :         msg = chkProgram(cntxt->usermodule, smb);
     214           0 :         if( msg )
     215           0 :                 goto bailout;
     216             :         /* evaluate MAL block and keep the ordered OID bat */
     217           0 :         newstk = prepareMALstack(smb, smb->vsize);
     218           0 :         if (newstk == NULL) {
     219           0 :                 msg = createException(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     220           0 :                 goto bailout;
     221             :         }
     222           0 :         newstk->up = 0;
     223           0 :         newstk->stk[arg].vtype= TYPE_bat;
     224           0 :         newstk->stk[arg].val.bval= b->batCacheid;
     225           0 :         BBPretain(newstk->stk[arg].val.bval);
     226           0 :         msg = runMALsequence(cntxt, smb, 1, 0, newstk, 0, 0);
     227           0 :         freeStack(newstk);
     228             :         /* get rid of temporary MAL block */
     229           0 :   bailout:
     230           0 :         freeSymbol(snew);
     231           0 :         return msg;
     232             : }
     233             : 
     234             : static str
     235           6 : OIDXcreate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     236             : {
     237             :         BAT *b;
     238             :         str msg= MAL_SUCCEED;
     239             :         int pieces = -1;
     240             : 
     241           6 :         if (pci->argc == 3) {
     242           6 :                 pieces = stk->stk[pci->argv[2]].val.ival;
     243           6 :                 if (pieces < 0)
     244           0 :                         throw(MAL,"bat.orderidx","Positive number expected");
     245             :         }
     246             : 
     247           6 :         b = BATdescriptor( *getArgReference_bat(stk, pci, 1));
     248           6 :         if (b == NULL)
     249           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     250           6 :         msg = OIDXcreateImplementation(cntxt, getArgType(mb,pci,1), b, pieces);
     251           6 :         BBPunfix(b->batCacheid);
     252           6 :         return msg;
     253             : }
     254             : 
     255             : static str
     256           0 : OIDXhasorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     257             : {
     258             :         BAT *b;
     259           0 :         bit *ret = getArgReference_bit(stk,pci,0);
     260           0 :         bat bid = *getArgReference_bat(stk, pci, 1);
     261             : 
     262             :         (void) cntxt;
     263             :         (void) mb;
     264             : 
     265           0 :         b = BATdescriptor(bid);
     266           0 :         if (b == NULL)
     267           0 :                 throw(MAL, "bat.hasorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     268             : 
     269           0 :         *ret = b->torderidx != NULL;
     270             : 
     271           0 :         BBPunfix(b->batCacheid);
     272           0 :         return MAL_SUCCEED;
     273             : }
     274             : 
     275             : static str
     276           6 : OIDXgetorderidx(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     277             : {
     278             :         BAT *b;
     279             :         BAT *bn;
     280           6 :         bat *ret = getArgReference_bat(stk,pci,0);
     281           6 :         bat bid = *getArgReference_bat(stk, pci, 1);
     282             : 
     283             :         (void) cntxt;
     284             :         (void) mb;
     285             : 
     286           6 :         b = BATdescriptor(bid);
     287           6 :         if (b == NULL)
     288           0 :                 throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     289             : 
     290           6 :         if (!BATcheckorderidx(b)) {
     291           0 :                 BBPunfix(b->batCacheid);
     292           0 :                 throw(MAL, "bat.getorderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     293             :         }
     294             : 
     295           6 :         if ((bn = COLnew(0, TYPE_oid, BATcount(b), TRANSIENT)) == NULL) {
     296           0 :                 BBPunfix(b->batCacheid);
     297           0 :                 throw(MAL, "bat.getorderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     298             :         }
     299           6 :         memcpy(Tloc(bn, 0), (const oid *) b->torderidx->base + ORDERIDXOFF,
     300           6 :                    BATcount(b) * SIZEOF_OID);
     301           6 :         BATsetcount(bn, BATcount(b));
     302           6 :         bn->tkey = true;
     303           6 :         bn->tsorted = bn->trevsorted = BATcount(b) <= 1;
     304           6 :         bn->tnil = false;
     305           6 :         bn->tnonil = true;
     306           6 :         *ret = bn->batCacheid;
     307           6 :         BBPkeepref(*ret);
     308           6 :         BBPunfix(b->batCacheid);
     309           6 :         return MAL_SUCCEED;
     310             : }
     311             : 
     312             : static str
     313           0 : OIDXorderidx(bat *ret, const bat *bid, const bit *stable)
     314             : {
     315             :         BAT *b;
     316             :         gdk_return r;
     317             : 
     318             :         (void) ret;
     319           0 :         b = BATdescriptor(*bid);
     320           0 :         if (b == NULL)
     321           0 :                 throw(MAL, "algebra.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     322             : 
     323           0 :         r = BATorderidx(b, *stable);
     324           0 :         if (r != GDK_SUCCEED) {
     325           0 :                 BBPunfix(*bid);
     326           0 :                 throw(MAL, "algebra.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     327             :         }
     328           0 :         *ret = *bid;
     329           0 :         BBPkeepref(*ret);
     330           0 :         return MAL_SUCCEED;
     331             : }
     332             : 
     333             : /*
     334             :  * Merge the collection of sorted OID BATs into one
     335             :  */
     336             : 
     337             : static str
     338           0 : OIDXmerge(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     339             : {
     340             :         bat bid;
     341             :         BAT *b;
     342             :         BAT **a;
     343             :         int i, j, n_ar;
     344             :         BUN m_sz;
     345             :         gdk_return rc;
     346             : 
     347             :         (void) cntxt;
     348             :         (void) mb;
     349             : 
     350           0 :         if( pci->retc != 1 )
     351           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, retc != 1 ");
     352           0 :         if( pci->argc < 2 )
     353           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, argc != 2");
     354             : 
     355           0 :         n_ar = pci->argc - 2;
     356             : 
     357           0 :         bid = *getArgReference_bat(stk, pci, 1);
     358           0 :         b = BATdescriptor(bid);
     359           0 :         if (b == NULL)
     360           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     361             : 
     362           0 :         if (b->torderidx) {
     363           0 :                 BBPunfix(bid);
     364           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY002) "INTERNAL ERROR, torderidx already set");
     365             :         }
     366             : 
     367           0 :         switch (ATOMbasetype(b->ttype)) {
     368             :         case TYPE_bte:
     369             :         case TYPE_sht:
     370             :         case TYPE_int:
     371             :         case TYPE_lng:
     372             : #ifdef HAVE_HGE
     373             :         case TYPE_hge:
     374             : #endif
     375             :         case TYPE_flt:
     376             :         case TYPE_dbl:
     377             :                 break;
     378           0 :         case TYPE_str:
     379             :                 /* TODO: support strings etc. */
     380             :         case TYPE_void:
     381             :         case TYPE_ptr:
     382             :         default:
     383           0 :                 BBPunfix(bid);
     384           0 :                 throw(MAL, "bat.orderidx", TYPE_NOT_SUPPORTED);
     385             :         }
     386             : 
     387           0 :         if ((a = (BAT **) GDKmalloc(n_ar*sizeof(BAT *))) == NULL) {
     388           0 :                 BBPunfix(bid);
     389           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     390             :         }
     391             :         m_sz = 0;
     392           0 :         for (i = 0; i < n_ar; i++) {
     393           0 :                 a[i] = BATdescriptor(*getArgReference_bat(stk, pci, i+2));
     394           0 :                 if (a[i] == NULL) {
     395           0 :                         for (j = i-1; j >= 0; j--) {
     396           0 :                                 BBPunfix(a[j]->batCacheid);
     397             :                         }
     398           0 :                         GDKfree(a);
     399           0 :                         BBPunfix(bid);
     400           0 :                         throw(MAL, "bat.orderidx", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     401             :                 }
     402           0 :                 m_sz += BATcount(a[i]);
     403           0 :                 if (BATcount(a[i]) == 0) {
     404           0 :                         BBPunfix(a[i]->batCacheid);
     405           0 :                         a[i] = NULL;
     406             :                 }
     407             :         }
     408           0 :         assert(m_sz == BATcount(b));
     409           0 :         for (i = 0; i < n_ar; i++) {
     410           0 :                 if (a[i] == NULL) {
     411           0 :                         n_ar--;
     412           0 :                         if (i < n_ar)
     413           0 :                                 a[i] = a[n_ar];
     414           0 :                         i--;
     415             :                 }
     416             :         }
     417             :         if (m_sz != BATcount(b)) {
     418             :                 BBPunfix(bid);
     419             :                 for (i = 0; i < n_ar; i++)
     420             :                         BBPunfix(a[i]->batCacheid);
     421             :                 GDKfree(a);
     422             :                 throw(MAL, "bat.orderidx", "count mismatch");
     423             :         }
     424             : 
     425           0 :         rc = GDKmergeidx(b, a, n_ar);
     426             : 
     427           0 :         for (i = 0; i < n_ar; i++)
     428           0 :                 BBPunfix(a[i]->batCacheid);
     429           0 :         GDKfree(a);
     430           0 :         BBPunfix(bid);
     431             : 
     432           0 :         if (rc != GDK_SUCCEED)
     433           0 :                 throw(MAL, "bat.orderidx", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     434             : 
     435             :         return MAL_SUCCEED;
     436             : }
     437             : 
     438             : #include "mel.h"
     439             : mel_func orderidx_init_funcs[] = {
     440             :  pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,2, arg("",void),batargany("bv",1))),
     441             :  pattern("bat", "orderidx", OIDXcreate, false, "Introduces the OID index arrangement of ordered values", args(1,3, arg("",void),batargany("bv",1),arg("pieces",int))),
     442             :  pattern("bat", "orderidx", OIDXmerge, false, "Consolidates the OID index arrangement", args(1,3, arg("",void),batargany("bv",1),batvarargany("l",1))),
     443             :  pattern("bat", "hasorderidx", OIDXhasorderidx, false, "Return true if order index exists", args(1,2, arg("",bit),batargany("bv",1))),
     444             :  pattern("bat", "getorderidx", OIDXgetorderidx, false, "Return the order index if it exists", args(1,2, batarg("",oid),batargany("bv",1))),
     445             :  command("algebra", "orderidx", OIDXorderidx, false, "Create an order index", args(1,3, batargany("",1),batargany("bv",1),arg("stable",bit))),
     446             :  { .imp=NULL }
     447             : };
     448             : #include "mal_import.h"
     449             : #ifdef _MSC_VER
     450             : #undef read
     451             : #pragma section(".CRT$XCU",read)
     452             : #endif
     453         257 : LIB_STARTUP_FUNC(init_orderidx_mal)
     454         257 : { mal_module("orderidx", NULL, orderidx_init_funcs); }

Generated by: LCOV version 1.14