LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_dataflow.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 153 165 92.7 %
Date: 2021-10-13 02:24:04 Functions: 7 7 100.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * The statemens are all checked for being eligible for dataflow.
      11             :  */
      12             : #include "monetdb_config.h"
      13             : #include "opt_dataflow.h"
      14             : #include "mal_instruction.h"
      15             : #include "mal_interpreter.h"
      16             : #include "manifold.h"
      17             : 
      18             : /*
      19             :  * Dataflow processing incurs overhead and is only
      20             :  * relevant if multiple tasks kan be handled at the same time.
      21             :  * Also simple expressions dont have to be executed in parallel.
      22             :  *
      23             :  * The garbagesink contains variables whose endoflife is within
      24             :  * a dataflow block and who are used concurrently.
      25             :  * They are garbage collected at the end of the parallel block.
      26             :  *
      27             :  * The dataflow analysis centers around the read/write use patterns of
      28             :  * the variables and the occurrence of side-effect bearing functions.
      29             :  * Any such function should break the dataflow block as it may rely
      30             :  * on the sequential order in the plan.
      31             :  *
      32             :  * The following state properties can be distinguished for all variables:
      33             :  * VARWRITE  - variable assigned a value in the dataflow block
      34             :  * VARREAD   - variable is used in an argument
      35             :  * VAR2READ  - variable is read in concurrent mode
      36             :  * VARBLOCK  - variable next use terminate the // block, set after encountering an update
      37             :  *
      38             :  * Only some combinations are allowed.
      39             :  */
      40             : 
      41             : #define VARFREE  0
      42             : #define VARWRITE 1
      43             : #define VARREAD  2
      44             : #define VARBLOCK 4
      45             : #define VAR2READ 8
      46             : 
      47             : typedef char *States;
      48             : 
      49             : #define setState(S,P,K,F)  ( assert(getArg(P,K) < vlimit), (S)[getArg(P,K)] |= F)
      50             : #define getState(S,P,K)  ((S)[getArg(P,K)])
      51             : 
      52             : typedef enum {
      53             :         no_region,
      54             :         singleton_region, // always a single statement
      55             :         dataflow_region,  // statements without or with controlled side effects, in parallel
      56             :         existing_region,  // existing barrier..exit region, copied as-is
      57             :         sql_region,        // region of nonconflicting sql.append/sql.updates only
      58             : } region_type;
      59             : 
      60             : typedef struct {
      61             :         region_type type;
      62             :         union {
      63             :                 struct {
      64             :                         int level;  // level of nesting
      65             :                 } existing_region;
      66             :         } st;
      67             : } region_state;
      68             : 
      69             : static int
      70     1145543 : simpleFlow(InstrPtr *old, int start, int last, region_state *state)
      71             : {
      72             :         int i, j, k, simple = TRUE;
      73             :         InstrPtr p = NULL, q;
      74             : 
      75             :         /* ignore trivial blocks */
      76     1145543 :         if ( last - start == 1)
      77             :                 return TRUE;
      78      138558 :         if ( state->type == existing_region )
      79             :                 // don't add additional barriers and garbage collection around existing region.
      80             :                 return TRUE;
      81             :         /* skip sequence of simple arithmetic first */
      82      277370 :         for( ; simple && start < last; start++)
      83      139455 :         if ( old[start] ) {
      84             :                 p= old[start];
      85      139455 :                 simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef || getModuleId(p) == strRef || getModuleId(p)== mmathRef;
      86             :         }
      87      213293 :         for( i = start; i < last; i++)
      88      200925 :         if ( old[i]) {
      89             :                 q= old[i];
      90      200925 :                 simple = getModuleId(q) == calcRef || getModuleId(q) == mtimeRef || getModuleId(q) == strRef || getModuleId(q)== mmathRef;
      91             :                 if( !simple)  {
      92             :                         /* if not arithmetic than we should consume the previous result directly */
      93     1202516 :                         for( j= q->retc; j < q->argc; j++)
      94     2515417 :                                 for( k =0; k < p->retc; k++)
      95     1510614 :                                         if( getArg(p,k) == getArg(q,j))
      96             :                                                 simple= TRUE;
      97      197713 :                         if( !simple)
      98             :                                 return 0;
      99             :                 }
     100             :                 p = q;
     101             :         }
     102             :         return simple;
     103             : }
     104             : 
     105             : /* Updates are permitted if it is a unique update on
     106             :  * a BAT created in the context of this block
     107             :  * As far as we know, no SQL nor MAL test re-uses the
     108             :  * target BAT to insert again and subsequently calls dataflow.
     109             :  * In MAL scripts, they still can occur.
     110             : */
     111             : 
     112             : /* a limited set of MAL instructions may appear in the dataflow block*/
     113             : static int
     114     8177862 : dataflowBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr p, States states)
     115             : {
     116             :         int j;
     117             : 
     118    16244007 :         if (p->token == ENDsymbol || p->barrier || isUnsafeFunction(p) ||
     119     8066935 :                 (isMultiplex(p) && MANIFOLDtypecheck(cntxt,mb,p,0) == NULL) ){
     120      112115 :                         return TRUE;
     121             :                 }
     122             : 
     123             :         /* flow blocks should be closed when we reach a point
     124             :            where a variable is assigned  more then once or already
     125             :            being read.
     126             :         */
     127    17258390 :         for(j=0; j<p->retc; j++)
     128     9192820 :                 if ( getState(states,p,j) & (VARWRITE | VARREAD | VARBLOCK)){
     129             :                         return 1;
     130             :                 }
     131             : 
     132             :         /* update instructions can be updated if the target variable
     133             :          * has not been read in the block so far */
     134     8065570 :         if ( isUpdateInstruction(p) ){
     135             :                 /* the SQL update functions change BATs that are not
     136             :                  * explicitly mentioned as arguments (and certainly not as the
     137             :                  * first argument), but that can still be available to the MAL
     138             :                  * program (see bugs.monetdb.org/6641) */
     139      402527 :                 if (getModuleId(p) == sqlRef)
     140             :                         return 1;
     141      399180 :                 return getState(states,p,p->retc) & (VARREAD | VARBLOCK);
     142             :         }
     143             : 
     144    37263226 :         for(j=p->retc; j < p->argc; j++){
     145    29600193 :                 if ( getState(states,p,j) & VARBLOCK){
     146             :                         return 1;
     147             :                 }
     148             :         }
     149     7663033 :         return hasSideEffects(mb,p,FALSE);
     150             : }
     151             : 
     152             : static str
     153             : get_str_arg(MalBlkPtr mb, InstrPtr p, int argno)
     154             : {
     155    15972964 :         int var = getArg(p, argno);
     156    15972964 :         return getVarConstant(mb, var).val.sval;
     157             : }
     158             : 
     159             : static str
     160             : get_sql_sname(MalBlkPtr mb, InstrPtr p)
     161             : {
     162             :         return get_str_arg(mb, p, 2);
     163             : }
     164             : 
     165             : static str
     166             : get_sql_tname(MalBlkPtr mb, InstrPtr p)
     167             : {
     168             :         return get_str_arg(mb, p, 3);
     169             : }
     170             : 
     171             : static str
     172             : get_sql_cname(MalBlkPtr mb, InstrPtr p)
     173             : {
     174             :         return get_str_arg(mb, p, 4);
     175             : }
     176             : 
     177             : 
     178             : static bool
     179     1300572 : isSqlAppendUpdate(MalBlkPtr mb, InstrPtr p)
     180             : {
     181     1300572 :         if (p->modname != sqlRef)
     182             :                 return false;
     183      949904 :         if (p->fcnname != appendRef && p->fcnname != updateRef)
     184             :                 return false;
     185             : 
     186             :         // pattern("sql", "append", mvc_append_wrap, false, "...", args(1,8, arg("",int),
     187             :         //                        arg("mvc",int),
     188             :         //                        arg("sname",str),
     189             :         //                        arg("tname",str),
     190             :         //                        arg("cname",str),
     191             :         //                        arg("offset",lng),
     192             :         //                        batarg("pos",oid),
     193             :         //                        argany("ins",0))),
     194             : 
     195             :         // pattern("sql", "update", mvc_update_wrap, false, "...", args(1,7, arg("",int),
     196             :         //                        arg("mvc",int),
     197             :         //                        arg("sname",str),
     198             :         //                        arg("tname",str),
     199             :         //                        arg("cname",str),
     200             :         //                        argany("rids",0),
     201             :         //                        argany("upd",0)))
     202             : 
     203      507038 :         if ((p->fcnname == appendRef && p->argc != 8) || (p->fcnname == updateRef && p->argc != 7))
     204             :                 return false;
     205             : 
     206      507038 :         int mvc_var = getArg(p, 1);
     207      507038 :         if (getVarType(mb, mvc_var) != TYPE_int)
     208             :                 return false;
     209             : 
     210      507038 :         int sname_var = getArg(p, 2);
     211      507038 :         if (getVarType(mb, sname_var) != TYPE_str || !isVarConstant(mb, sname_var))
     212             :                 return false;
     213             : 
     214      507038 :         int tname_var = getArg(p, 3);
     215      507038 :         if (getVarType(mb, tname_var) != TYPE_str || !isVarConstant(mb, tname_var))
     216             :                 return false;
     217             : 
     218      507038 :         int cname_var = getArg(p, 4);
     219      507038 :         if (getVarType(mb, cname_var) != TYPE_str || !isVarConstant(mb, cname_var))
     220           0 :                 return false;
     221             : 
     222             :         return true;
     223             : }
     224             : 
     225             : static bool
     226      507037 : sqlBreakpoint(MalBlkPtr mb, InstrPtr *first, InstrPtr *p)
     227             : {
     228      507037 :         InstrPtr instr = *p;
     229      507037 :         if (!isSqlAppendUpdate(mb, instr))
     230             :                 return true;
     231             : 
     232             :         str my_sname = get_sql_sname(mb, instr);
     233             :         str my_tname = get_sql_tname(mb, instr);
     234             :         str my_cname = get_sql_cname(mb, instr);
     235    15972961 :         for (InstrPtr *q = first; q < p; q++) {
     236    15532007 :                 str cname = get_sql_cname(mb, *q);
     237    15532007 :                 if (strcmp(my_cname, cname) != 0) {
     238             :                         // different cname, no conflict
     239    15532006 :                         continue;
     240             :                 }
     241             :                 str tname = get_sql_tname(mb, *q);
     242           1 :                 if (strcmp(my_tname, tname) != 0) {
     243             :                         // different tname, no conflict
     244           0 :                         continue;
     245             :                 }
     246             :                 str sname = get_sql_sname(mb, *q);
     247           1 :                 if (strcmp(my_sname, sname) != 0) {
     248             :                         // different sname, no conflict
     249           0 :                         continue;
     250             :                 }
     251             :                 // Found a statement in the region that works on the same column so this is a breakpoint
     252             :                 return true;
     253             :         }
     254             : 
     255             :         // None of the statements in the region works on this column so no breakpoint necessary
     256             :         return false;
     257             : }
     258             : 
     259             : static bool
     260     9622541 : checkBreakpoint(Client cntxt, MalBlkPtr mb, InstrPtr *first, InstrPtr *p, States states, region_state *state)
     261             : {
     262     9622541 :         InstrPtr instr = *p;
     263     9622541 :         switch (state->type) {
     264             :                 case singleton_region:
     265             :                         // by definition
     266             :                         return true;
     267     8177862 :                 case dataflow_region:
     268     8177862 :                         return dataflowBreakpoint(cntxt, mb, instr, states);
     269        4831 :                 case existing_region:
     270        4831 :                         if (state->st.existing_region.level == 0) {
     271             :                                 // previous statement ended the region so we break here
     272             :                                 return true;
     273             :                         }
     274        4188 :                         if (blockStart(instr)) {
     275          19 :                                 state->st.existing_region.level += 1;
     276        4169 :                         } else if (blockExit(instr)) {
     277         662 :                                 state->st.existing_region.level -= 1;
     278             :                         }
     279             :                         return false;
     280      507037 :                 case sql_region:
     281      507037 :                         return sqlBreakpoint(mb, first, p);
     282             :                 default:
     283             :                         // serious corruption has occurred.
     284           0 :                         assert(0);                      /* corrupted region_type */
     285             :                         abort();
     286             :         }
     287             :         assert(0);                                      /* unreachable */
     288             :         return true;
     289             : }
     290             : 
     291             : static void
     292      794178 : decideRegionType(Client cntxt, MalBlkPtr mb, InstrPtr p, States states, region_state *state)
     293             : {
     294             :         (void) cntxt;
     295             : 
     296      794178 :         state->type = no_region;
     297      794178 :         if (blockStart(p)) {
     298         643 :                 state->type = existing_region;
     299         643 :                 state->st.existing_region.level = 1;
     300      793535 :         } else if (p->token == ENDsymbol) {
     301           0 :                 state->type = existing_region;
     302      793535 :         } else if (isSqlAppendUpdate(mb,p)) {
     303       66083 :                 state->type = sql_region;
     304      727452 :         } else if (p->barrier) {
     305         374 :                 state->type = singleton_region;
     306      727078 :         } else if (isUnsafeFunction(p)) {
     307      226079 :                 state->type = singleton_region;
     308      500999 :         } else if (
     309      500999 :                 isUpdateInstruction(p)
     310          35 :                 && getModuleId(p) != sqlRef
     311          19 :                 && (getState(states, p, p->retc) & (VARREAD | VARBLOCK)) == 0
     312             :         ) {
     313             :                 // Special case. Unless they're from the sql module, instructions with
     314             :                 // names like 'append', 'update', 'delete', 'grow', etc., are expected
     315             :                 // to express their side effects as data dependencies, for example,
     316             :                 //       X5 := bat.append(X_5, ...)
     317          19 :                 state->type = dataflow_region;
     318      500980 :         } else if (hasSideEffects(mb, p, false)) {
     319      354397 :                 state->type = singleton_region;
     320      146583 :         } else if (isMultiplex(p)) {
     321         596 :                 state->type = singleton_region;
     322             :         } else {
     323      145987 :                 state->type = dataflow_region;
     324             :         }
     325      794178 :         assert(state->type != no_region);
     326      794178 : }
     327             : 
     328             : 
     329             : /* dataflow blocks are transparent, because they are always
     330             :    executed, either sequentially or in parallel */
     331             : 
     332             : str
     333      351365 : OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     334             : {
     335             :         int i,j,k, start, slimit, breakpoint, actions=0, simple = TRUE;
     336             :         int flowblock= 0;
     337             :         InstrPtr p, *old = NULL, q;
     338             :         int limit, vlimit;
     339             :         States states = NULL;
     340      351365 :         region_state state = { singleton_region };
     341             :         str msg = MAL_SUCCEED;
     342             : 
     343             :         /* don't use dataflow on single processor systems */
     344      351365 :         if (GDKnr_threads <= 1 || cntxt->workerlimit == 1)
     345           0 :                 goto wrapup;
     346             : 
     347      351365 :         if ( optimizerIsApplied(mb,dataflowRef))
     348           0 :                 goto wrapup;
     349             :         (void) stk;
     350             :         /* inlined functions will get their dataflow control later */
     351      351365 :         if ( mb->inlineProp)
     352           0 :                 goto wrapup;
     353             : 
     354      351365 :         vlimit = mb->vsize;
     355      351365 :         states = (States) GDKzalloc(vlimit * sizeof(char));
     356      351365 :         if (states == NULL ){
     357           0 :                 throw(MAL,"optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     358             :         }
     359             : 
     360      351365 :         setVariableScope(mb);
     361             : 
     362      351365 :         limit= mb->stop;
     363      351365 :         slimit= mb->ssize;
     364      351365 :         old = mb->stmt;
     365      351365 :         if (newMalBlkStmt(mb, mb->ssize) < 0) {
     366           0 :                 GDKfree(states);
     367           0 :                 throw(MAL,"optimizer.dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     368             :         }
     369             : 
     370             :         /* inject new dataflow barriers using a single pass through the program */
     371             :         start = 0;
     372      351365 :         state.type = singleton_region;
     373     9622541 :         for (i = 1; i<limit; i++) {
     374     9622541 :                 p = old[i];
     375     9622541 :                 assert(p);
     376     9622541 :                 breakpoint = checkBreakpoint(cntxt, mb, &old[start], &old[i], states, &state);
     377     9622542 :                 if ( breakpoint ){
     378             :                         /* close previous flow block */
     379     1145543 :                         simple = simpleFlow(old,start,i, &state);
     380             : 
     381     1145543 :                         if ( !simple){
     382      125560 :                                 flowblock = newTmpVariable(mb,TYPE_bit);
     383      125560 :                                 q= newFcnCall(mb,languageRef,dataflowRef);
     384      125560 :                                 q->barrier= BARRIERsymbol;
     385      125560 :                                 getArg(q,0)= flowblock;
     386      125560 :                                 actions++;
     387             :                         }
     388             :                         // copyblock the collected statements
     389    10768089 :                         for( j=start ; j<i; j++) {
     390     9622546 :                                 q= old[j];
     391     9622546 :                                 pushInstruction(mb,q);
     392             :                                 // collect BAT variables garbage collected within the block
     393     9622546 :                                 if( !simple)
     394    42907873 :                                         for( k=q->retc; k<q->argc; k++){
     395    34325938 :                                                 if (getState(states,q,k) & VAR2READ &&  getEndScope(mb,getArg(q,k)) == j && isaBatType(getVarType(mb,getArg(q,k))) ){
     396             :                                                         InstrPtr r;
     397     1924779 :                                                         r = newInstruction(NULL,languageRef, passRef);
     398     1924779 :                                                         getArg(r,0) = newTmpVariable(mb,TYPE_void);
     399     1924779 :                                                         r= addArgument(mb,r, getArg(q,k));
     400     1924779 :                                                         pushInstruction(mb,r);
     401             :                                                 }
     402             :                                         }
     403             :                         }
     404             :                         /* exit parallel block */
     405     1145543 :                         if ( ! simple){
     406      125560 :                                 q= newAssignment(mb);
     407      125560 :                                 q->barrier= EXITsymbol;
     408      125560 :                                 getArg(q,0) = flowblock;
     409             :                         }
     410     1145543 :                         if (p->token == ENDsymbol){
     411    11243551 :                                 for(; i < limit; i++)
     412    10892186 :                                         if( old[i])
     413    10892186 :                                                 pushInstruction(mb,old[i]);
     414             :                                 break;
     415             :                         }
     416             : 
     417             :                         // Start a new region
     418      794178 :                         memset((char*) states, 0, vlimit * sizeof(char));
     419             :                         start = i;
     420      794178 :                         decideRegionType(cntxt, mb, p, states, &state);
     421             :                 }
     422             : 
     423             :                 // remember you assigned/read variables
     424    19749941 :                 for ( k = 0; k < p->retc; k++)
     425    10478764 :                         setState(states, p, k, VARWRITE);
     426     9271177 :                 if( isUpdateInstruction(p) && (getState(states,p,1) == 0 || getState(states,p,1) & VARWRITE))
     427      535154 :                         setState(states, p,1, VARBLOCK);
     428    46193842 :                 for ( k = p->retc; k< p->argc; k++)
     429    36922666 :                 if( !isVarConstant(mb,getArg(p,k)) ){
     430    18347509 :                         if( getState(states, p, k) & VARREAD)
     431     7506725 :                                 setState(states, p, k, VAR2READ);
     432             :                         else
     433    10840784 :                         if( getState(states, p, k) & VARWRITE)
     434     8529526 :                                 setState(states, p ,k, VARREAD);
     435             :                 }
     436             :         }
     437             : 
     438             :         /* take the remainder as is */
     439    74420059 :         for (; i<slimit; i++)
     440    74068694 :                 if (old[i])
     441           0 :                         pushInstruction(mb, old[i]);
     442             :         /* Defense line against incorrect plans */
     443      351365 :         if( actions > 0){
     444      113055 :                 msg = chkTypes(cntxt->usermodule, mb, FALSE);
     445      113055 :                 if (!msg)
     446      113055 :                         msg = chkFlow(mb);
     447      113055 :                 if (!msg)
     448      113055 :                         msg = chkDeclarations(mb);
     449             :         }
     450      238310 : wrapup:
     451             :         /* keep actions taken as a fake argument*/
     452      351365 :         (void) pushInt(mb, pci, actions);
     453             : 
     454      351365 :         if(states) GDKfree(states);
     455      351365 :         if(old) GDKfree(old);
     456             :         return msg;
     457             : }

Generated by: LCOV version 1.14