LCOV - code coverage report
Current view: top level - monetdb5/optimizer - opt_remoteQueries.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 30 163 18.4 %
Date: 2021-10-13 02:24:04 Functions: 1 2 50.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "opt_remoteQueries.h"
      11             : #include "mal_interpreter.h"  /* for showErrors() */
      12             : #include "mal_builder.h"
      13             : 
      14             : /*
      15             :  * The instruction sent is produced with a variation of call2str
      16             :  * from the debugger.
      17             :  */
      18             : static str
      19           0 : RQcall2str(MalBlkPtr mb, InstrPtr p)
      20             : {
      21             :         int k;
      22             :         size_t len=1;
      23             :         str msg;
      24             :         str s,cv= NULL;
      25             : 
      26           0 :         msg = (str) GDKmalloc(BUFSIZ);
      27           0 :         if (msg == NULL)
      28             :                 return NULL;
      29           0 :         msg[0]='#';
      30           0 :         msg[1]=0;
      31           0 :         if( p->barrier)
      32           0 :                 strcat(msg, operatorName(p->barrier));
      33             : 
      34           0 :         if( p->retc > 1) strcat(msg,"(");
      35           0 :         len = strlen(msg);
      36           0 :         for (k = 0; k < p->retc; k++) {
      37           0 :                 sprintf(msg+len, "%s", getVarName(mb,getArg(p,k)));
      38           0 :                 if (k < p->retc - 1)
      39           0 :                         strcat(msg,",");
      40           0 :                 len = strlen(msg);
      41             :         }
      42           0 :         if( p->retc > 1) strcat(msg,")");
      43           0 :         sprintf(msg+len,":= %s.%s(",getModuleId(p),getFunctionId(p));
      44           0 :         s = strchr(msg, '(');
      45           0 :         if (s) {
      46             :                 s++;
      47           0 :                 *s = 0;
      48           0 :                 len = strlen(msg);
      49           0 :                 for (k = p->retc; k < p->argc; k++) {
      50           0 :                         VarPtr v = getVar(mb, getArg(p, k));
      51           0 :                         if( isVarConstant(mb, getArg(p,k)) ){
      52           0 :                                 if( v->type == TYPE_void) {
      53           0 :                                         sprintf(msg+len, "nil");
      54             :                                 } else {
      55           0 :                                         if ((cv = VALformat(&v->value)) == NULL) {
      56           0 :                                                 GDKfree(msg);
      57           0 :                                                 return NULL;
      58             :                                         }
      59           0 :                                         sprintf(msg+len,"%s:%s",cv, ATOMname(v->type));
      60           0 :                                         GDKfree(cv);
      61             :                                 }
      62             : 
      63             :                         } else
      64           0 :                                 sprintf(msg+len, "%s", getVarName(mb, getArg(p,k)));
      65           0 :                         if (k < p->argc - 1)
      66           0 :                                 strcat(msg,",");
      67           0 :                         len = strlen(msg);
      68             :                 }
      69           0 :                 strcat(msg,");");
      70             :         }
      71             : /* printf("#RQcall:%s\n",msg);*/
      72             :         return msg;
      73             : }
      74             : /*
      75             :  * The algorithm follows the common scheme used so far.
      76             :  * Instructions are taken out one-by-one and copied
      77             :  * to the new block.
      78             :  *
      79             :  * A local cache of connections is established, because
      80             :  * the statements related to a single remote database
      81             :  * should be executed in the same stack context.
      82             :  * A pitfall is to create multiple connections with
      83             :  * their isolated runtime environment.
      84             :  */
      85             : #define lookupServer(X)\
      86             :         /* lookup the server connection */\
      87             :         if( location[getArg(p,0)] == 0){\
      88             :                 db = 0;\
      89             :                 if( isVarConstant(mb,getArg(p,X)) )\
      90             :                         db= getVarConstant(mb, getArg(p,X)).val.sval;\
      91             :                 for(k=0; k<dbtop; k++)\
      92             :                         if( strcmp(db, dbalias[k].dbname)== 0)\
      93             :                                 break;\
      94             :                 \
      95             :                 if( k== dbtop){\
      96             :                         r= newInstruction(mb,mapiRef,lookupRef);\
      97             :                         j= getArg(r,0)= newTmpVariable(mb, TYPE_int);\
      98             :                         r= addArgument(mb,r, getArg(p,X));\
      99             :                         pushInstruction(mb,r);\
     100             :                         dbalias[dbtop].dbhdl= j;\
     101             :                         dbalias[dbtop++].dbname= db;\
     102             :                         if( dbtop== 127) dbtop--;\
     103             :                 } else j= dbalias[k].dbhdl;\
     104             :                 location[getArg(p,0)]= j;\
     105             :         } else j= location[getArg(p,0)];
     106             : 
     107             : #define prepareRemote(X)\
     108             :         r= newInstruction(mb,mapiRef,rpcRef);\
     109             :         getArg(r,0)= newTmpVariable(mb, X);\
     110             :         r= addArgument(mb,r,j);
     111             : 
     112             : #define putRemoteVariables()\
     113             :         for(j=p->retc; j<p->argc; j++)\
     114             :         if( location[getArg(p,j)] == 0 && !isVarConstant(mb,getArg(p,j)) ){\
     115             :                 q= newInstruction(0, mapiRef, putRef);\
     116             :                 getArg(q,0)= newTmpVariable(mb, TYPE_void);\
     117             :                 q= addArgument(mb,q,location[getArg(p,j)]);\
     118             :                 q= pushStr(mb,q, getVarName(mb,getArg(p,j)));\
     119             :                 (void) addArgument(mb,q,getArg(p,j));\
     120             :                 pushInstruction(mb,q);\
     121             :         }
     122             : 
     123             : #define remoteAction()\
     124             :         s= RQcall2str(mb,p);\
     125             :         r= pushStr(mb,r,s+1);\
     126             :         GDKfree(s);\
     127             :         pushInstruction(mb,r);\
     128             :         freeInstruction(p);\
     129             :         actions++;
     130             : 
     131             : typedef struct{
     132             :         str dbname;
     133             :         int dbhdl;
     134             : } DBalias;
     135             : 
     136             : str
     137           1 : OPTremoteQueriesImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     138             : {
     139             :         InstrPtr p, q, r, *old;
     140             :         int i, j, k, cnt, limit, slimit, actions=0;
     141             :         int remoteSite,collectFirst;
     142             :         int *location;
     143             :         DBalias *dbalias;
     144             :         int dbtop;
     145             :         char buf[BUFSIZ],*s, *db;
     146             :         ValRecord cst;
     147             :         str msg = MAL_SUCCEED;
     148             : 
     149           1 :         cst.vtype= TYPE_int;
     150           1 :         cst.val.ival= 0;
     151           1 :         cst.len = 0;
     152             : 
     153             :         (void) cntxt;
     154             :         (void) stk;
     155             : 
     156           1 :         limit = mb->stop;
     157           1 :         slimit = mb->ssize;
     158           1 :         old = mb->stmt;
     159             : 
     160           1 :         location= (int*) GDKzalloc(mb->vsize * sizeof(int));
     161           1 :         if ( location == NULL)
     162           0 :                 throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     163           1 :         dbalias= (DBalias*) GDKzalloc(128 * sizeof(DBalias));
     164           1 :         if (dbalias == NULL){
     165           0 :                 GDKfree(location);
     166           0 :                 throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     167             :         }
     168             :         dbtop= 0;
     169             : 
     170           1 :         if ( newMalBlkStmt(mb, mb->ssize) < 0){
     171           0 :                 GDKfree(dbalias);
     172           0 :                 GDKfree(location);
     173           0 :                 throw(MAL, "optimizer.remote", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     174             :         }
     175             : 
     176           4 :         for (i = 0; i < limit; i++) {
     177           3 :                 p = old[i];
     178             : 
     179             :                 /* detect remote instructions */
     180             :                 cnt=0;
     181           5 :                 for(j=0; j<p->argc; j++)
     182           2 :                         if (location[getArg(p,j)])
     183           0 :                                 cnt++;
     184             : 
     185             :                 /* detect remote variable binding */
     186             : 
     187           3 :                 if( (getModuleId(p)== mapiRef && getFunctionId(p)==bindRef)){
     188           0 :                         if( p->argc == 3 && getArgType(mb,p,1) == TYPE_int ) {
     189             :                                 int tpe;
     190             :                                 j = getArg(p,1); /* lookupServer with key */
     191           0 :                                 tpe = getArgType(mb,p,0);
     192             :                                 /* result is remote */
     193           0 :                                 location[getArg(p,0)]= j;
     194             : 
     195             :                                 /* turn the instruction into a local one */
     196             :                                 /* one argument less */
     197           0 :                                 p->argc--;
     198             :                                 /* only use the second argument (string) */
     199           0 :                                 getArg(p,1)= getArg(p,2);
     200             : 
     201           0 :                                 getModuleId(p) = bbpRef;
     202             : 
     203           0 :                                 prepareRemote(tpe)
     204           0 :                                 putRemoteVariables()
     205           0 :                                 remoteAction()
     206             :                         } else
     207           0 :                                 pushInstruction(mb,p);
     208           3 :                 } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==evalRef) ){
     209           0 :                         if( p->argc == 3){
     210             :                                 /* a remote sql eval is needed */
     211           0 :                                 lookupServer(1)
     212             : 
     213             :                                 /* turn the instruction into a local one */
     214             :                                 /* one argument less */
     215           0 :                                 p->argc--;
     216             :                                 /* only use the second argument (string) */
     217           0 :                                 getArg(p,1)= getArg(p,2);
     218             : 
     219           0 :                                 prepareRemote(TYPE_void)
     220             : 
     221           0 :                                 s= RQcall2str(mb,p);
     222           0 :                                 r= pushStr(mb,r,s+1);
     223           0 :                                 GDKfree(s);
     224           0 :                                 pushInstruction(mb,r);
     225           0 :                                 freeInstruction(p);
     226           0 :                                 actions++;
     227             :                         }
     228           3 :                 } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==bindRef) ){
     229             : 
     230           0 :                         if( p->argc == 6 && getArgType(mb,p,4) == TYPE_str ) {
     231             :                                 int tpe;
     232             :                                 j = getArg(p,1); /* lookupServer with key */
     233           0 :                                 tpe = getArgType(mb,p,0);
     234             : 
     235           0 :                                 lookupServer(4)
     236             : 
     237             :                                 /* turn the instruction into a local one */
     238           0 :                                 k = defConstant(mb, TYPE_int, &cst);
     239           0 :                                 if( k>=0){
     240           0 :                                         getArg(p,4)= k;
     241           0 :                                         prepareRemote(tpe)
     242           0 :                                         putRemoteVariables()
     243           0 :                                         remoteAction()
     244             :                                 }
     245             :                         } else
     246           0 :                                 pushInstruction(mb,p);
     247             :                 } else
     248           3 :                 if(getModuleId(p)== sqlRef && getFunctionId(p)== binddbatRef) {
     249             : 
     250           0 :                         if( p->argc == 5 && getArgType(mb,p,3) == TYPE_str ) {
     251           0 :                                 lookupServer(3)
     252             : 
     253             :                                 /* turn the instruction into a local one */
     254           0 :                                 k= defConstant(mb, TYPE_int, &cst);
     255           0 :                                 if( k >= 0){
     256           0 :                                         getArg(p,3)= defConstant(mb, TYPE_int, &cst);
     257           0 :                                         prepareRemote(TYPE_void)
     258           0 :                                         putRemoteVariables()
     259           0 :                                         remoteAction()
     260             :                                 }
     261             :                         } else {
     262           0 :                                 pushInstruction(mb,p);
     263             :                         }
     264             :                 } else
     265           3 :                 if( getModuleId(p) == optimizerRef || cnt == 0 || p->barrier) /* local only or flow control statement */
     266           3 :                         pushInstruction(mb,p);
     267             :                 else {
     268             :                         /*
     269             :                          * The hard part is to decide what to do with instructions that
     270             :                          * contain a reference to a remote variable.
     271             :                          * In the first implementation we use the following policy.
     272             :                          * If there are multiple sites involved, all arguments are
     273             :                          * moved local for processing. Moreover, all local arguments
     274             :                          * to be shipped should be simple.
     275             :                          */
     276             :                         remoteSite=0;
     277             :                         collectFirst= FALSE;
     278           0 :                         for(j=0; j<p->argc; j++)
     279           0 :                         if( location[getArg(p,j)]){
     280           0 :                                 if (remoteSite == 0)
     281             :                                         remoteSite= location[getArg(p,j)];
     282           0 :                                 else if( remoteSite != location[getArg(p,j)])
     283             :                                         collectFirst= TRUE;
     284             :                         }
     285           0 :                         if( getModuleId(p)== ioRef || (getModuleId(p)== sqlRef
     286           0 :                                         && (getFunctionId(p)== resultSetRef ||
     287           0 :                                 getFunctionId(p)== rsColumnRef)))
     288             :                                  collectFirst= TRUE;
     289             : 
     290             :                         /* local BATs are not shipped */
     291           0 :                         if( remoteSite && collectFirst== FALSE)
     292           0 :                                 for(j=p->retc; j<p->argc; j++)
     293           0 :                                 if( location[getArg(p,j)] == 0 &&
     294           0 :                                         isaBatType(getVarType(mb,getArg(p,j))))
     295             :                                                 collectFirst= TRUE;
     296             : 
     297           0 :                         if (collectFirst){
     298             :                                 /* perform locally */
     299           0 :                                 for(j=p->retc; j<p->argc; j++)
     300           0 :                                 if( location[getArg(p,j)]){
     301           0 :                                         q= newInstruction(0,mapiRef,rpcRef);
     302           0 :                                         getArg(q,0)= getArg(p,j);
     303           0 :                                         q= addArgument(mb,q,location[getArg(p,j)]);
     304           0 :                                         snprintf(buf,BUFSIZ,"io.print(%s);",
     305             :                                                 getVarName(mb,getArg(p,j)) );
     306           0 :                                         q=  pushStr(mb,q,buf);
     307           0 :                                         pushInstruction(mb,q);
     308             :                                 }
     309           0 :                                 pushInstruction(mb,p);
     310             :                                 /* as of now all the targets are also local */
     311           0 :                                 for(j=0; j<p->retc; j++)
     312           0 :                                         location[getArg(p,j)]= 0;
     313           0 :                                 actions++;
     314           0 :                         } else if (remoteSite){
     315             :                                 /* single remote site involved */
     316           0 :                                 r= newInstruction(mb,mapiRef,rpcRef);
     317           0 :                                 getArg(r,0)= newTmpVariable(mb, TYPE_void);
     318           0 :                                 r= addArgument(mb, r, remoteSite);
     319             : 
     320           0 :                                 for(j=p->retc; j<p->argc; j++)
     321           0 :                                 if( location[getArg(p,j)] == 0 && !isVarConstant(mb,getArg(p,j)) ){
     322           0 :                                         q= newInstruction(0,mapiRef,putRef);
     323           0 :                                         getArg(q,0)= newTmpVariable(mb, TYPE_void);
     324           0 :                                         q= addArgument(mb, q, remoteSite);
     325           0 :                                         q= pushStr(mb,q, getVarName(mb,getArg(p,j)));
     326           0 :                                         (void) addArgument(mb, q, getArg(p,j));
     327           0 :                                         pushInstruction(mb,q);
     328             :                                 }
     329           0 :                                 s= RQcall2str(mb, p);
     330           0 :                                 pushInstruction(mb,r);
     331           0 :                                 (void) pushStr(mb,r,s+1);
     332           0 :                                 GDKfree(s);
     333           0 :                                 for(j=0; j<p->retc; j++)
     334           0 :                                         location[getArg(p,j)]= remoteSite;
     335           0 :                                 freeInstruction(p);
     336           0 :                                 actions++;
     337             :                         } else
     338           0 :                                 pushInstruction(mb,p);
     339             :                 }
     340             :         }
     341         254 :         for(; i<slimit; i++)
     342         253 :         if( old[i])
     343           0 :                 pushInstruction(mb, old[i]);
     344           1 :         GDKfree(old);
     345           1 :         GDKfree(location);
     346           1 :         GDKfree(dbalias);
     347             : 
     348             :         /* Defense line against incorrect plans */
     349           1 :         if( actions){
     350           0 :                 msg = chkTypes(cntxt->usermodule, mb, FALSE);
     351           0 :                 if (!msg)
     352           0 :                         msg = chkFlow(mb);
     353           0 :                 if (!msg)
     354           0 :                         msg = chkDeclarations(mb);
     355             :         }
     356             :         /* keep actions taken as a fake argument*/
     357           1 :         (void) pushInt(mb, pci, actions);
     358           1 :         return msg;
     359             : }

Generated by: LCOV version 1.14