LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_mitosis.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 129 147 87.8 %
Date: 2021-10-13 02:24:04 Functions: 1 1 100.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "opt_mitosis.h"
      11             : #include "mal_interpreter.h"
      12             : #include "gdk_utils.h"
      13             : 
      14             : 
      15             : str
      16      255604 : OPTmitosisImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
      17             : {
      18             :         int i, j, limit, slimit, estimate = 0, pieces = 1, mito_parts = 0, mito_size = 0, row_size = 0, mt = -1;
      19             :         str schema = 0, table = 0;
      20             :         BUN r = 0, rowcnt = 0;  /* table should be sizeable to consider parallel execution*/
      21             :         InstrPtr p, q, *old, target = 0;
      22             :         size_t argsize = 6 * sizeof(lng), m = 0, memclaim;
      23             :         /*       estimate size per operator estimate:   4 args + 2 res*/
      24      255604 :         int threads = GDKnr_threads ? GDKnr_threads : 1;
      25             :         str msg = MAL_SUCCEED;
      26             : 
      27             :         /* if the user has associated limitation on the number of threads, respect it in the
      28             :          * generation of the number of partitions. Beware, they may lead to larger pieces, it only
      29             :          * limits the CPU power */
      30      255604 :         if( cntxt->workerlimit)
      31             :                 threads= cntxt->workerlimit;
      32             :         (void) cntxt;
      33             :         (void) stk;
      34             : 
      35      255604 :         old = mb->stmt;
      36    12761652 :         for (i = 1; i < mb->stop; i++) {
      37    12508701 :                 InstrPtr p = old[i];
      38             : 
      39    12508701 :                 if (getModuleId(p) == sqlRef && getFunctionId(p) == assertRef &&
      40        9154 :                         p->argc > 2 && getArgType(mb, p, 2) == TYPE_str &&
      41        9154 :                         isVarConstant(mb, getArg(p, 2)) &&
      42        9154 :                         getVarConstant(mb, getArg(p, 2)).val.sval != NULL &&
      43        9154 :                         (strstr(getVarConstant(mb, getArg(p, 2)).val.sval, "PRIMARY KEY constraint") ||
      44        8017 :                          strstr(getVarConstant(mb, getArg(p, 2)).val.sval, "UNIQUE constraint"))){
      45             :                         pieces = 0;
      46        1203 :                         goto bailout;
      47             :                 }
      48             : 
      49             :                 /* mitosis/mergetable bailout conditions */
      50             : 
      51    12507498 :                 if (p->argc > 2 && getModuleId(p) == aggrRef &&
      52        7685 :                                 getFunctionId(p) != subcountRef &&
      53        3241 :                                 getFunctionId(p) != subminRef &&
      54        3041 :                                 getFunctionId(p) != submaxRef &&
      55        2709 :                                 getFunctionId(p) != subavgRef &&
      56        2453 :                                 getFunctionId(p) != subsumRef &&
      57         525 :                                 getFunctionId(p) != subprodRef &&
      58             : 
      59         518 :                                 getFunctionId(p) != countRef &&
      60         432 :                                 getFunctionId(p) != minRef &&
      61         432 :                                 getFunctionId(p) != maxRef &&
      62         432 :                                 getFunctionId(p) != avgRef &&
      63         380 :                                 getFunctionId(p) != sumRef &&
      64         380 :                                 getFunctionId(p) != prodRef){
      65             :                                 pieces = 0;
      66         380 :                                 goto bailout;
      67             :                         }
      68             : 
      69             :                 /* do not split up floating point bat that is being summed */
      70    12507118 :                 if (p->retc == 1 &&
      71    11836413 :                         (((p->argc == 6 || p->argc == 7) &&
      72      557952 :                           getModuleId(p) == aggrRef &&
      73    11836413 :                           getFunctionId(p) == subsumRef) ||
      74      881729 :                          (p->argc == 4 &&
      75      881729 :                           getModuleId(p) == aggrRef &&
      76           0 :                           getFunctionId(p) == sumRef)) &&
      77        1928 :                         isaBatType(getArgType(mb, p, p->retc)) &&
      78        1928 :                         (getBatType(getArgType(mb, p, p->retc)) == TYPE_flt ||
      79             :                          getBatType(getArgType(mb, p, p->retc)) == TYPE_dbl)){
      80             :                                 pieces = 0;
      81          17 :                                 goto bailout;
      82             :                         }
      83             : 
      84    12507101 :                 if (p->argc > 2 && (getModuleId(p) == capiRef || getModuleId(p) == rapiRef || getModuleId(p) == pyapi3Ref) &&
      85         114 :                                 getFunctionId(p) == subeval_aggrRef){
      86             :                                 pieces = 0;
      87          17 :                                 goto bailout;
      88             :                         }
      89             : 
      90             :                 /* Mergetable cannot handle intersect/except's for now */
      91    12507084 :                 if (getModuleId(p) == algebraRef && getFunctionId(p) == groupbyRef){
      92             :                         pieces = 0;
      93        1036 :                         goto bailout;
      94             :                 }
      95             : 
      96             :                 /* locate the largest non-partitioned table */
      97    12506048 :                 if (getModuleId(p) != sqlRef ||
      98     1308220 :                                 (getFunctionId(p) != bindRef &&
      99      650316 :                                  getFunctionId(p) != bindidxRef &&
     100      649495 :                                  getFunctionId(p) != tidRef))
     101    11693193 :                         continue;
     102             :                 /* don't split insert BATs */
     103      812855 :                 if (p->argc > 5 && getVarConstant(mb, getArg(p, 5)).val.ival == 1)
     104           0 :                         continue;
     105      812855 :                 if (p->argc > 6)
     106      245624 :                         continue;  /* already partitioned */
     107             :                 /*
     108             :                  * The SQL optimizer already collects the counts of the base
     109             :                  * table and passes them on as a row property.  All pieces for a
     110             :                  * single subplan should ideally fit together.
     111             :                  */
     112      567231 :                 r = getRowCnt(mb, getArg(p, 0));
     113      567231 :                 if (r > rowcnt) {
     114             :                         /* the rowsize depends on the column types, assume void-headed */
     115       41684 :                         row_size = ATOMsize(getBatType(getArgType(mb,p,0)));
     116             :                         rowcnt = r;
     117             :                         target = p;
     118       41684 :                         estimate++;
     119             :                         r = 0;
     120             :                 }
     121             :         }
     122      252951 :         if (target == 0){
     123             :                 pieces = 0;
     124      215024 :                 goto bailout;
     125             :         }
     126             :         /*
     127             :          * The number of pieces should be based on the footprint of the
     128             :          * queryplan, such that preferrably it can be handled without
     129             :          * swapping intermediates.  For the time being we just go for pieces
     130             :          * that fit into memory in isolation.  A fictive rowcount is derived
     131             :          * based on argument types, such that all pieces would fit into
     132             :          * memory conveniently for processing. We attempt to use not more
     133             :          * threads than strictly needed.
     134             :          * Experience shows that the pieces should not be too small.
     135             :          * If we should limit to |threads| is still an open issue.
     136             :          *
     137             :          * Take into account the number of client connections,
     138             :          * because all user together are responsible for resource contentions
     139             :          */
     140       37927 :         cntxt->idle = 0; // this one is definitely not idle
     141             : 
     142             : /* This code was used to experiment with block sizes, mis-using the memorylimit  variable
     143             :         if (cntxt->memorylimit){
     144             :                 // the new mitosis scheme uses a maximum chunck size in MB from the client context
     145             :                 m = (((size_t) cntxt->memorylimit) * 1048576) / (size_t) row_size;
     146             :                 pieces = (int) (rowcnt / m + (rowcnt - m * pieces > 0));
     147             :         }
     148             :         if (cntxt->memorylimit == 0 || pieces <= 1){
     149             : */
     150             :         if (pieces <= 1){
     151             :                 /* We haven't assigned the number of pieces.
     152             :                  * Determine the memory available for this client
     153             :                  */
     154             : 
     155             :                 /* respect the memory limit size set for the user 
     156             :                 * and determine the column slice size 
     157             :                 */
     158       37927 :                 if( cntxt->memorylimit)
     159           0 :                         m = (((size_t) cntxt->memorylimit) * 1048576) / argsize;
     160             :                 else {
     161       37927 :                         memclaim= MCmemoryClaim();
     162       37927 :                         if(memclaim == GDK_mem_maxsize){
     163       37927 :                                 m = GDK_mem_maxsize / (size_t) MCactiveClients() / argsize;
     164             :                         } else
     165           0 :                                 m = (GDK_mem_maxsize - memclaim) / argsize;
     166             :                 }
     167             : 
     168             :                 /* if data exceeds memory size,
     169             :                  * i.e., (rowcnt*argsize > GDK_mem_maxsize),
     170             :                  * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */
     171       37927 :                 if (rowcnt > m && m / threads > 0) {
     172             :                         /* create |pieces| > |threads| partitions such that
     173             :                          * |threads| partitions at a time fit in memory,
     174             :                          * i.e., (threads*(rowcnt/pieces) <= m),
     175             :                          * i.e., (rowcnt/pieces <= m/threads),
     176             :                          * i.e., (pieces => rowcnt/(m/threads))
     177             :                          * (assuming that (m > threads*MINPARTCNT)) */
     178             :                         /* the number of pieces affects SF-100, going beyond 8x increases 
     179             :                          * the optimizer costs beyond the execution time
     180             :                          */
     181           0 :                         pieces = 4 * (int) ceil((double)rowcnt / m / threads);
     182       37927 :                 } else if (rowcnt > MINPARTCNT) {
     183             :                 /* exploit parallelism, but ensure minimal partition size to
     184             :                  * limit overhead */
     185         185 :                         pieces = 4 * (int) ceil(MIN((double)rowcnt / MINPARTCNT, threads));
     186             :                 }
     187             :         }
     188             : 
     189             :         /* when testing, always aim for full parallelism, but avoid
     190             :          * empty pieces */
     191       37927 :         FORCEMITODEBUG
     192       37736 :         if (pieces < threads)
     193       37666 :                 pieces = (int) MIN((BUN) threads, rowcnt);
     194             :         /* prevent plan explosion */
     195             :         if (pieces > MAXSLICES)
     196             :                 pieces = MAXSLICES;
     197             :         /* to enable experimentation we introduce the option to set
     198             :          * the number of parts required and/or the size of each chunk (in K)
     199             :          */
     200       37927 :         mito_parts = GDKgetenv_int("mito_parts", 0);
     201       37927 :         if (mito_parts > 0)
     202             :                 pieces = mito_parts;
     203       37927 :         mito_size = GDKgetenv_int("mito_size", 0);
     204       37927 :         if (mito_size > 0)
     205           0 :                 pieces = (int) ((rowcnt * row_size) / (mito_size * 1024));
     206             : 
     207       37927 :         if (pieces <= 1){
     208             :                 pieces = 0;
     209         296 :                 goto bailout;
     210             :         }
     211             : 
     212             :         /* at this stage we have identified the #chunks to be used for the largest table */
     213       37631 :         limit = mb->stop;
     214       37631 :         slimit = mb->ssize;
     215       37631 :         if (newMalBlkStmt(mb, mb->stop + 2 * estimate) < 0)
     216           0 :                 throw(MAL,"optimizer.mitosis", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     217             :         estimate = 0;
     218             : 
     219       37631 :         schema = getVarConstant(mb, getArg(target, 2)).val.sval;
     220       37631 :         table = getVarConstant(mb, getArg(target, 3)).val.sval;
     221     5335749 :         for (i = 0; i < limit; i++) {
     222             :                 int upd = 0, qtpe, rtpe = 0, qv, rv;
     223             :                 InstrPtr matq, matr = NULL;
     224     5298118 :                 p = old[i];
     225             : 
     226     5298118 :                 if (getModuleId(p) != sqlRef ||
     227     1206406 :                         !(getFunctionId(p) == bindRef ||
     228      575364 :                           getFunctionId(p) == bindidxRef ||
     229      574739 :                           getFunctionId(p) == tidRef)) {
     230     4524010 :                         pushInstruction(mb, p);
     231     4524010 :                         continue;
     232             :                 }
     233             :                 /* don't split insert BATs */
     234      774108 :                 if (p->argc == 6 && getVarConstant(mb, getArg(p, 5)).val.ival == 1) {
     235           0 :                         pushInstruction(mb, p);
     236           0 :                         continue;
     237             :                 }
     238      774108 :                 r = getRowCnt(mb, getArg(p, 0));
     239      774108 :                 if (r < rowcnt) {
     240      487048 :                         pushInstruction(mb, p);
     241      487048 :                         continue;
     242             :                 }
     243             :                 /* Don't split the (index) bat if we already have identified a range */
     244             :                 /* This will happen if we inline separately optimized routines */
     245      287060 :                 if (p->argc > 7) {
     246           0 :                         pushInstruction(mb, p);
     247           0 :                         continue;
     248             :                 }
     249      287060 :                 if (p->retc == 2)
     250             :                         upd = 1;
     251      287060 :                 if (mt < 0 && (strcmp(schema, getVarConstant(mb, getArg(p, 2 + upd)).val.sval) ||
     252      287060 :                                    strcmp(table, getVarConstant(mb, getArg(p, 3 + upd)).val.sval))) {
     253         259 :                         pushInstruction(mb, p);
     254         259 :                         continue;
     255             :                 }
     256             :                 /* we keep the original bind operation, because it allows for
     257             :                  * easy undo when the mergtable can not do something */
     258             :                 // pushInstruction(mb, p);
     259             : 
     260      286801 :                 qtpe = getVarType(mb, getArg(p, 0));
     261             : 
     262      286801 :                 matq = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
     263      286801 :                 getArg(matq, 0) = getArg(p, 0);
     264             : 
     265      286801 :                 if (upd) {
     266      102704 :                         matr = newInstructionArgs(NULL, matRef, newRef, pieces + 1);
     267      102704 :                         getArg(matr, 0) = getArg(p, 1);
     268      102704 :                         rtpe = getVarType(mb, getArg(p, 1));
     269             :                 }
     270             : 
     271     1426609 :                 for (j = 0; j < pieces; j++) {
     272     1139808 :                         q = copyInstruction(p);
     273     1139812 :                         if( q == NULL){
     274           0 :                                 for (; i<limit; i++)
     275           0 :                                         if (old[i])
     276           0 :                                                 pushInstruction(mb,old[i]);
     277           0 :                                 GDKfree(old);
     278           0 :                                 throw(MAL,"optimizer.mitosis", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     279             :                         }
     280     1139812 :                         q = pushInt(mb, q, j);
     281     1139812 :                         q = pushInt(mb, q, pieces);
     282             : 
     283     1139808 :                         qv = getArg(q, 0) = newTmpVariable(mb, qtpe);
     284     1139808 :                         if (upd) {
     285      404157 :                                 rv = getArg(q, 1) = newTmpVariable(mb, rtpe);
     286             :                         }
     287     1139808 :                         pushInstruction(mb, q);
     288     1139808 :                         matq = addArgument(mb, matq, qv);
     289     1139808 :                         if (upd)
     290      404157 :                                 matr = addArgument(mb, matr, rv);
     291             :                 }
     292      286801 :                 pushInstruction(mb, matq);
     293      286800 :                 if (upd)
     294      102704 :                         pushInstruction(mb, matr);
     295      286800 :                 freeInstruction(p);
     296             :         }
     297     7414485 :         for (; i<slimit; i++)
     298     7376854 :                 if (old[i])
     299           0 :                         pushInstruction(mb, old[i]);
     300       37631 :         GDKfree(old);
     301             : 
     302             :         /* Defense line against incorrect plans */
     303       37631 :         msg = chkTypes(cntxt->usermodule, mb, FALSE);
     304       37631 :         if (!msg)
     305       37631 :                 msg = chkFlow(mb);
     306       37631 :         if (!msg)
     307       37631 :                 msg = chkDeclarations(mb);
     308           0 : bailout:
     309             :         /* keep actions taken as a fake argument*/
     310      255604 :         (void) pushInt(mb, pci, pieces);
     311      255604 :         return msg;
     312             : }

Generated by: LCOV version 1.14