LCOV - code coverage report
Current view: top level - sql/backends/monet5 - wlr.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 13 544 2.4 %
Date: 2021-10-13 02:24:04 Functions: 3 25 12.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * A master can be replicated by taking a binary copy of the 'bat' directory
      11             :  * when in quiescent mode or a more formal snapshot..
      12             :  * Alternatively you start with an empty database.
      13             :  *
      14             :  * The wlc log records written are numbered 0.. wlc_tag - 1
      15             :  * The replicator copies all of them unto and including wlc_limit.
      16             :  * This leads to the wlr_tag from -1 .. wlc_limit, wlr_tag,..., INT64_MAX
      17             :  *
      18             :  * Replication start after setting the master id and giving an (optional) wlr_limit.
      19             :  * Any error encountered in replaying the log stops the process, because then
      20             :  * no guarantee can be given on the consistency with the master database.
      21             :  * A manual fix for an exceptional case is allowed, whereafter a call
      22             :  * to CALL wlrclear() accepts the failing transaction and prepares
      23             :  * to the next CALL replicate(),
      24             :  */
      25             : #include "monetdb_config.h"
      26             : #include "sql.h"
      27             : #include "wlc.h"
      28             : #include "wlr.h"
      29             : #include "sql_scenario.h"
      30             : #include "sql_execute.h"
      31             : #include "opt_prelude.h"
      32             : #include "mal_parser.h"
      33             : #include "mal_client.h"
      34             : #include "mal_authorize.h"
      35             : #include "querylog.h"
      36             : #include "mutils.h"
      37             : 
      38             : #define WLR_WAIT 0
      39             : #define WLR_RUN   101
      40             : #define WLR_STOP 201
      41             : 
      42             : #define WLC_COMMIT 40
      43             : #define WLC_ROLLBACK 50
      44             : #define WLC_ERROR 60
      45             : 
      46             : MT_Lock     wlr_lock = MT_LOCK_INITIALIZER(wlr_lock);
      47             : 
      48             : /* The current status of the replica processing.
      49             :  * It is based on the assumption that at most one replica thread is running
      50             :  * importing data from a single master.
      51             :  */
      52             : static char wlr_master[IDLENGTH];
      53             : static int      wlr_batches;                            // the next file to be processed
      54             : static lng      wlr_tag = -1;                           // the last transaction id being processed
      55             : static char wlr_read[26];                               // last record read
      56             : static char wlr_timelimit[128];                 // stop re-processing transactions when time limit is reached
      57             : static int      wlr_beat;                                       // period between successive synchronisations with master
      58             : static char wlr_error[BUFSIZ];  // error that stopped the replication process
      59             : 
      60             : static MT_Id wlr_thread = 0;                    // The single replicator thread is active
      61             : static int      wlr_state = WLR_WAIT;           // which state WAIT/RUN
      62             : static lng      wlr_limit = -1;                         // stop re-processing after transaction id 'wlr_limit' is processed
      63             : 
      64             : #define MAXLINE 2048
      65             : 
      66             : /* Simple read the replica configuration status file */
      67             : static str
      68           3 : WLRgetConfig(void){
      69             :         char *path;
      70             :         char line[MAXLINE];
      71             :         FILE *fd;
      72             :         int len;
      73             :         str msg= MAL_SUCCEED;
      74             : 
      75           3 :         if((path = GDKfilepath(0, 0, "wlr.config", 0)) == NULL)
      76           0 :                 throw(MAL,"wlr.getConfig", "Could not create wlr.config file path\n");
      77             :         fd = MT_fopen(path,"r");
      78           3 :         GDKfree(path);
      79           3 :         if( fd == NULL)
      80           3 :                 throw(MAL,"wlr.getConfig", "Could not access wlr.config file \n");
      81           0 :         while( fgets(line, MAXLINE, fd) ){
      82           0 :                 line[strlen(line)-1]= 0;
      83           0 :                 if( strncmp("master=", line,7) == 0) {
      84           0 :                         len = snprintf(wlr_master, IDLENGTH, "%s", line + 7);
      85           0 :                         if (len == -1 || len >= IDLENGTH) {
      86           0 :                                 msg= createException(SQL,"wlr.getConfig", "Master config value is too large\n");
      87           0 :                                 goto bailout;
      88             :                         } else
      89           0 :                         if (len  == 0) {
      90           0 :                                 msg = createException(SQL,"wlr.getConfig", "Master config path is missing\n");
      91           0 :                                 goto bailout;
      92             :                         }
      93             :                 } else
      94           0 :                 if( strncmp("batches=", line, 8) == 0)
      95           0 :                         wlr_batches = atoi(line+ 8);
      96             :                 else
      97           0 :                 if( strncmp("tag=", line, 4) == 0)
      98           0 :                         wlr_tag = atoi(line+ 4);
      99             :                 else
     100           0 :                 if( strncmp("beat=", line, 5) == 0)
     101           0 :                         wlr_beat = atoi(line+ 5);
     102             :                 else
     103           0 :                 if( strncmp("read=", line, 5) == 0)
     104           0 :                         strcpy(wlr_read, line + 5);
     105             :                 else
     106           0 :                 if( strncmp("error=", line, 6) == 0) {
     107             :                         char *s;
     108           0 :                         len = snprintf(wlr_error, BUFSIZ, "%s", line + 6);
     109           0 :                         if (len == -1 || len >= BUFSIZ) {
     110           0 :                                 msg = createException(SQL, "wlr.getConfig", "Config value is too large\n");
     111           0 :                                 goto bailout;
     112             :                         }
     113           0 :                         s = strchr(wlr_error, (int) '\n');
     114           0 :                         if ( s) *s = 0;
     115             :                 }
     116             :         }
     117           0 : bailout:
     118           0 :         fclose(fd);
     119           0 :         return msg;
     120             : }
     121             : 
     122             : /* Keep the current status in the configuration status file */
     123             : static str
     124           0 : WLRputConfig(void){
     125             :         char *path;
     126             :         stream *fd;
     127             :         str msg = MAL_SUCCEED;
     128             : 
     129           0 :         if((path = GDKfilepath(0,0,"wlr.config",0)) == NULL)
     130           0 :                 throw(SQL, "wlr.putConfig", "Could not access wlr.config file\n");
     131           0 :         fd = open_wastream(path);
     132           0 :         GDKfree(path);
     133           0 :         if( fd == NULL)
     134           0 :                 throw(SQL,"wlr.putConfig", "Could not create wlr.config file: %s\n", mnstr_peek_error(NULL));
     135             : 
     136           0 :         mnstr_printf(fd,"master=%s\n", wlr_master);
     137           0 :         mnstr_printf(fd,"batches=%d\n", wlr_batches);
     138           0 :         mnstr_printf(fd,"tag="LLFMT"\n", wlr_tag);
     139           0 :         mnstr_printf(fd,"beat=%d\n", wlr_beat);
     140           0 :         if( wlr_timelimit[0])
     141           0 :                 mnstr_printf(fd,"read=%s\n", wlr_read);
     142           0 :         if( wlr_error[0])
     143           0 :                 mnstr_printf(fd,"error=%s", wlr_error);
     144           0 :         close_stream(fd);
     145           0 :         return msg;
     146             : }
     147             : 
     148             : /*
     149             :  * When the master database exist, we should set the replica administration.
     150             :  * But only once.
     151             :  *
     152             :  * The log files are identified by a range. It starts with 0 when an empty database
     153             :  * was used to bootstrap. Otherwise it is the range received from the dbmaster.
     154             :  * At any time we should be able to restart the synchronization
     155             :  * process by grabbing a new set of log files.
     156             :  * This calls for keeping track in the replica what log files have been applied
     157             :  * and what the last completed transaction was.
     158             :  *
     159             :  * Given that the replication thread runs independently, all errors encountered
     160             :  * should be sent to the system logging system.
     161             :  */
     162             : static str
     163           0 : WLRgetMaster(void)
     164             : {
     165             :         char path[FILENAME_MAX];
     166             :         int len;
     167             :         str dir, msg = MAL_SUCCEED;
     168             :         FILE *fd;
     169             : 
     170           0 :         if( wlr_master[0] == 0 )
     171             :                 return MAL_SUCCEED;
     172             : 
     173             :         /* collect master properties */
     174           0 :         len = snprintf(path, FILENAME_MAX, "..%c%s", DIR_SEP, wlr_master);
     175           0 :         if (len == -1 || len >= FILENAME_MAX)
     176           0 :                 throw(MAL, "wlr.getMaster", "wlc.config filename path is too large");
     177           0 :         if ((dir = GDKfilepath(0, path, "wlc.config", 0)) == NULL)
     178           0 :                 throw(MAL,"wlr.getMaster","Could not access wlc.config file %s/wlc.config\n", path);
     179             : 
     180             :         fd = MT_fopen(dir,"r");
     181           0 :         GDKfree(dir);
     182           0 :         if (fd == NULL)
     183           0 :                 throw(MAL,"wlr.getMaster","Could not get read access to '%s'config file\n", wlr_master);
     184           0 :         msg = WLCreadConfig(fd);
     185           0 :         if( msg != MAL_SUCCEED)
     186             :                 return msg;
     187           0 :         if( ! wlr_master[0] )
     188           0 :                 throw(MAL,"wlr.getMaster","Master not identified\n");
     189           0 :         wlc_state = WLC_CLONE; // not used as master
     190             :         if( !wlr_master[0] )
     191             :                 throw(MAL,"wlr.getMaster","Master not identified\n");
     192             :         wlc_state = WLC_CLONE; // not used as master
     193           0 :         return MAL_SUCCEED;
     194             : }
     195             : 
     196             : /* each WLR block is turned into a separate MAL block and executed
     197             :  * This block is re-used as we consider the complete file.
     198             :  */
     199             : 
     200             : #define cleanup(){\
     201             :         resetMalBlk(mb);\
     202             :         }
     203             : 
     204             : static str
     205           0 : WLRprocessBatch(Client cntxt)
     206             : {
     207             :         int i, len;
     208             :         char path[FILENAME_MAX];
     209             :         stream *fd = NULL;
     210             :         Client c;
     211             :         size_t sz;
     212             :         MalBlkPtr mb;
     213             :         InstrPtr q;
     214             :         str other;
     215             :         mvc *sql;
     216             :         Symbol prev = NULL;
     217             :         lng tag;
     218             :         char tag_read[26];                      // stop re-processing transactions when time limit is reached
     219             :         str action= NULL;
     220             :         str msg= MAL_SUCCEED, msg2= MAL_SUCCEED;
     221             : 
     222           0 :         msg = WLRgetConfig();
     223           0 :         tag = wlr_tag;
     224           0 :         if( msg != MAL_SUCCEED){
     225           0 :                 snprintf(wlr_error, BUFSIZ, "%s", msg);
     226           0 :                 freeException(msg);
     227           0 :                 return MAL_SUCCEED;
     228             :         }
     229           0 :         if( wlr_error[0]) {
     230           0 :                 if (!(msg = GDKstrdup(wlr_error)))
     231           0 :                         throw(MAL, "wlr.batch", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     232             :                 return msg;
     233             :         }
     234             : 
     235           0 :         c = MCforkClient(cntxt);
     236           0 :         if( c == 0)
     237           0 :                 throw(MAL, "wlr.batch", "Could not create user for WLR process\n");
     238           0 :         c->promptlength = 0;
     239           0 :         c->listing = 0;
     240           0 :         c->fdout = open_wastream(".wlr");
     241           0 :         if(c->fdout == NULL) {
     242           0 :                 MCcloseClient(c);
     243           0 :                 throw(MAL,"wlr.batch", "Could not create user for WLR process: %s\n", mnstr_peek_error(NULL));
     244             :         }
     245             : 
     246             :         /* Cook a log file into a concreate MAL function for multiple transactions */
     247           0 :         prev = newFunction(putName(sql_private_module_name), putName("wlr"), FUNCTIONsymbol);
     248           0 :         if(prev == NULL) {
     249           0 :                 MCcloseClient(c);
     250           0 :                 throw(MAL, "wlr.batch", "Could not create user for WLR process\n");
     251             :         }
     252           0 :         c->curprg = prev;
     253           0 :         mb = c->curprg->def;
     254           0 :         setVarType(mb, 0, TYPE_void);
     255             : 
     256           0 :         msg = SQLinitClient(c);
     257           0 :         if( msg != MAL_SUCCEED) {
     258           0 :                 MCcloseClient(c);
     259           0 :                 freeSymbol(prev);
     260           0 :                 return msg;
     261             :         }
     262           0 :         if ((msg = getSQLContext(c, mb, &sql, NULL))) {
     263           0 :                 SQLexitClient(c);
     264           0 :                 MCcloseClient(c);
     265           0 :                 freeSymbol(prev);
     266           0 :                 return msg;
     267             :         }
     268           0 :         if ((msg = checkSQLContext(c)) != NULL) {
     269           0 :                 SQLexitClient(c);
     270           0 :                 MCcloseClient(c);
     271           0 :                 freeSymbol(prev);
     272           0 :                 return msg;
     273             :         }
     274             : 
     275           0 :         path[0]=0;
     276           0 :         for( i= wlr_batches; i < wlc_batches && !GDKexiting() && wlr_state != WLR_STOP && wlr_tag <= wlr_limit && msg == MAL_SUCCEED; i++){
     277           0 :                 len = snprintf(path,FILENAME_MAX,"%s%c%s_%012d", wlc_dir, DIR_SEP, wlr_master, i);
     278           0 :                 if (len == -1 || len >= FILENAME_MAX) {
     279           0 :                         msg = createException(MAL, "wlr.batch", "Filename path is too large\n");
     280           0 :                         break;
     281             :                 }
     282           0 :                 fd= open_rastream(path);
     283           0 :                 if( fd == NULL) {
     284           0 :                         msg = createException(MAL, "wlr.batch", "Cannot access path '%s': %s\n", path, mnstr_peek_error(NULL));
     285           0 :                         break;
     286             :                 }
     287           0 :                 sz = getFileSize(fd);
     288           0 :                 if (sz > (size_t) 1 << 29) {
     289           0 :                         close_stream(fd);
     290           0 :                         msg = createException(MAL, "wlr.batch", "File %s is too large to process\n", path);
     291           0 :                         break;
     292             :                 }
     293           0 :                 if ((c->fdin = bstream_create(fd, sz == 0 ? (size_t) (2 * 128 * BLOCK) : sz)) == NULL) {
     294           0 :                         close_stream(fd);
     295           0 :                         msg = createException(MAL, "wlr.batch", "Failed to open stream for file %s\n", path);
     296           0 :                         break;
     297             :                 }
     298           0 :                 if (bstream_next(c->fdin) < 0){
     299           0 :                         msg = createException(MAL, "wlr.batch", "Could not read %s\n", path);
     300           0 :                         break;
     301             :                 }
     302             : 
     303           0 :                 c->yycur = 0;
     304             : 
     305             :                 // now parse the file line by line to reconstruct the WLR blocks
     306             :                 do{
     307           0 :                         parseMAL(c, c->curprg, 1, 1, NULL);
     308             : 
     309           0 :                         mb = c->curprg->def;
     310           0 :                         if( mb->errors){
     311             :                                 msg = mb->errors;
     312           0 :                                 mb->errors = NULL;
     313           0 :                                 cleanup();
     314           0 :                                 break;
     315             :                         }
     316           0 :                         if( mb->stop == 1){
     317           0 :                                 cleanup();
     318           0 :                                 break;
     319             :                         }
     320           0 :                         q= getInstrPtr(mb, mb->stop - 1);
     321           0 :                         if( getModuleId(q) != wlrRef){
     322           0 :                                 char *s = instruction2str(mb,0, q, LIST_MAL_CALL);
     323           0 :                                 msg = createException(MAL,"wlr.process", "batch %d:improper wlr instruction: %s\n", i, s);
     324           0 :                                 GDKfree(s);
     325           0 :                                 cleanup();
     326           0 :                                 break;
     327             :                         }
     328           0 :                         if ( getModuleId(q) == wlrRef && getFunctionId(q) == actionRef ){
     329           0 :                                 action = getVarConstant(mb, getArg(q,1)).val.sval;
     330             :                         }
     331           0 :                         if ( getModuleId(q) == wlrRef && getFunctionId(q) == catalogRef ){
     332           0 :                                 action = getVarConstant(mb, getArg(q,1)).val.sval;
     333             :                         }
     334           0 :                         if( getModuleId(q) == wlrRef && getFunctionId(q) == transactionRef){
     335           0 :                                 tag = getVarConstant(mb, getArg(q,1)).val.lval;
     336           0 :                                 snprintf(tag_read, sizeof(tag_read), "%s", getVarConstant(mb, getArg(q,2)).val.sval);
     337             : 
     338             :                                 // break loop if we don't see a the next expected transaction
     339           0 :                                 if ( tag <= wlr_tag){
     340             :                                         /* skip already executed transaction log */
     341           0 :                                         continue;
     342             :                                 } else
     343           0 :                                 if(  ( tag > wlr_limit) ||
     344           0 :                                           ( wlr_timelimit[0] && strcmp(tag_read, wlr_timelimit) > 0)){
     345             :                                         /* stop execution of the transactions if your reached the limit */
     346           0 :                                         cleanup();
     347           0 :                                         break;
     348             :                                 }
     349             :                         }
     350             :                         // only re-execute successful transactions.
     351           0 :                         if ( getModuleId(q) == wlrRef && getFunctionId(q) ==commitRef  && (tag > wlr_tag || ( wlr_timelimit[0] && strcmp(tag_read, wlr_timelimit) > 0))){
     352           0 :                                 pushEndInstruction(mb);
     353             :                                 // execute this block if no errors are found
     354           0 :                                 msg = chkTypes(c->usermodule, mb, FALSE);
     355           0 :                                 if (!msg)
     356           0 :                                         msg = chkFlow(mb);
     357           0 :                                 if (!msg)
     358           0 :                                         msg = chkDeclarations(mb);
     359           0 :                                 if (!msg)
     360           0 :                                         setVariableScope(mb);
     361           0 :                                 wlr_tag =  tag; // remember which transaction we executed
     362           0 :                                 snprintf(wlr_read, sizeof(wlr_read), "%s", tag_read);
     363           0 :                                 if(!msg && mb->errors == 0){
     364           0 :                                         sql->session->auto_commit = 0;
     365           0 :                                         sql->session->ac_on_commit = 1;
     366           0 :                                         sql->session->level = 0;
     367           0 :                                         if(mvc_trans(sql) < 0) {
     368           0 :                                                 TRC_ERROR(SQL_TRANS, "Allocation failure while starting the transaction\n");
     369             :                                         } else {
     370           0 :                                                 msg= runMAL(c,mb,0,0);
     371           0 :                                                 if( msg == MAL_SUCCEED){
     372             :                                                         /* at this point we have updated the replica, but the configuration has not been changed.
     373             :                                                          * If at this point an error occurs, we could redo the same transaction twice later on.
     374             :                                                          * The solution is to make sure that we recognize that a transaction has started and is completed successfully
     375             :                                                          */
     376           0 :                                                         msg = WLRputConfig();
     377           0 :                                                         if( msg)
     378             :                                                                 break;
     379             :                                                 }
     380             :                                                 // ignore warnings
     381           0 :                                                 if (msg && strstr(msg,"WARNING"))
     382             :                                                         msg = MAL_SUCCEED;
     383           0 :                                                 if( msg != MAL_SUCCEED){
     384             :                                                         // they should always succeed
     385           0 :                                                         msg =createException(MAL,"wlr.process", "Replication error in batch %d:"LLFMT" :%s:%s\n", i, wlr_tag, msg, action);
     386           0 :                                                         if((other = mvc_rollback(sql,0,NULL, false)) != MAL_SUCCEED) //an error was already established
     387           0 :                                                                 GDKfree(other);
     388             :                                                         break;
     389             :                                                 } else
     390           0 :                                                 if((other = mvc_commit(sql, 0, 0, false)) != MAL_SUCCEED) {
     391           0 :                                                         msg = createException(MAL,"wlr.process", "transaction %d:"LLFMT" commit failed: %s\n", i, tag, other);
     392           0 :                                                         freeException(other);
     393           0 :                                                         break;
     394             :                                                 }
     395             :                                         }
     396             :                                 } else {
     397           0 :                                         if( msg == MAL_SUCCEED)
     398           0 :                                                 msg = createException(SQL, "wlr.replicate", "typechecking failed '%s':'%s':\n",path, mb->errors);
     399           0 :                                         cleanup();
     400           0 :                                         break;
     401             :                                 }
     402           0 :                                 cleanup();
     403           0 :                                 if ( wlr_tag + 1 == wlc_tag || tag == wlr_limit)
     404             :                                                 break;
     405             :                         } else
     406           0 :                         if ( getModuleId(q) == wlrRef && (getFunctionId(q) == rollbackRef || getFunctionId(q) == commitRef)){
     407           0 :                                 cleanup();
     408           0 :                                 if ( wlr_tag + 1 == wlc_tag || tag == wlr_limit || ( wlr_timelimit[0] && strcmp(tag_read, wlr_timelimit) > 0))
     409             :                                                 break;
     410             :                         }
     411           0 :                 } while(wlr_state != WLR_STOP &&  mb->errors == 0 && msg == MAL_SUCCEED);
     412             : 
     413             :                 // skip to next file when all is read correctly
     414           0 :                 if (msg == MAL_SUCCEED && tag <= wlr_limit)
     415           0 :                         wlr_batches++;
     416           0 :                 if( msg != MAL_SUCCEED)
     417           0 :                         snprintf(wlr_error, BUFSIZ, "%s", msg);
     418           0 :                 msg2 = WLRputConfig();
     419           0 :                 bstream_destroy(c->fdin);
     420           0 :                 if(msg2)
     421             :                         break;
     422           0 :                 if ( wlr_tag == wlr_limit)
     423             :                         break;
     424             :         }
     425             : 
     426           0 :         close_stream(c->fdout);
     427           0 :         SQLexitClient(c);
     428           0 :         MCcloseClient(c);
     429             :         if (prev)
     430           0 :                 freeSymbol(prev);
     431           0 :         if (msg2) { /* throw msg2, if msg is not set */
     432           0 :                 if (!msg)
     433             :                         msg = msg2;
     434             :                 else
     435           0 :                         freeException(msg2);
     436             :         }
     437             :         return msg;
     438             : }
     439             : 
     440             : /*
     441             :  *  A single WLR thread is allowed to run in the background.
     442             :  *  If it happens to crash then replication roll forward is suspended.
     443             :  *  The background job can only leave error messages in the merovingian log.
     444             :  *
     445             :  * A timing issue.
     446             :  * The WLRprocess can only start after an SQL environment has been initialized.
     447             :  * It is therefore initialized when a SQLclient() is issued.
     448             :  */
     449             : static void
     450           0 : WLRprocessScheduler(void *arg)
     451             : {
     452             :         Client cntxt = (Client) arg;
     453             :         int duration = 0;
     454             :         str msg = MAL_SUCCEED;
     455             : 
     456           0 :         msg = WLRgetConfig();
     457           0 :         if ( msg ){
     458           0 :                 snprintf(wlr_error, BUFSIZ, "%s", msg);
     459           0 :                 freeException(msg);
     460           0 :                 return;
     461             :         }
     462             : 
     463           0 :         assert(wlr_master[0]);
     464           0 :         if (!(cntxt = MCinitClient(MAL_ADMIN, NULL,NULL))) {
     465           0 :                 snprintf(wlr_error, BUFSIZ, "Failed to init WLR scheduler client");
     466           0 :                 return;
     467             :         }
     468             : 
     469           0 :         MT_lock_set(&wlr_lock);
     470           0 :         if ( wlr_state != WLR_STOP)
     471           0 :                 wlr_state = WLR_RUN;
     472           0 :         MT_lock_unset(&wlr_lock);
     473             : 
     474           0 :         while( wlr_state != WLR_STOP  && !wlr_error[0]){
     475             :                 // wait at most for the cycle period, also at start
     476           0 :                 duration = (wlc_beat > 0 ? wlc_beat:1) * 1000 ;
     477           0 :                 if( wlr_timelimit[0]){
     478           0 :                         timestamp ts = timestamp_current();
     479           0 :                         str wlc_time = NULL;
     480           0 :                         size_t wlc_limit = 0;
     481             :                         int compare;
     482             : 
     483           0 :                         assert(!is_timestamp_nil(ts));
     484           0 :                         if (timestamp_tostr(&wlc_time, &wlc_limit, &ts, true) < 0) {
     485           0 :                                 snprintf(wlr_error, BUFSIZ, "Unable to retrieve current time");
     486           0 :                                 return;
     487             :                         }
     488             :                         // actually never wait longer then the timelimit requires
     489             :                         // preference is given to the beat.
     490           0 :                         compare = strncmp(wlc_time, wlr_timelimit, sizeof(wlr_timelimit));
     491           0 :                         GDKfree(wlc_time);
     492           0 :                         MT_thread_setworking("sleeping");
     493           0 :                         if (compare >= 0 && duration >100)
     494           0 :                                 MT_sleep_ms(duration);
     495             :                 }
     496           0 :                 for( ; duration > 0  && wlr_state != WLR_STOP; duration -= 200){
     497           0 :                         if ( wlr_tag + 1 == wlc_tag || wlr_tag >= wlr_limit || wlr_limit == -1){
     498           0 :                                 MT_thread_setworking("sleeping");
     499           0 :                                 MT_sleep_ms(200);
     500             :                         }
     501             :                 }
     502           0 :                 MT_thread_setworking("processing wlr");
     503           0 :                 if ((msg = WLRprocessBatch(cntxt)))
     504           0 :                         freeException(msg);
     505             : 
     506             :                 /* Can not use GDKexiting(), because a test may already reach that point before it did anything.
     507             :                  * Instead wait for the explicit WLR_STOP
     508             :                  */
     509           0 :                 if( GDKexiting()){
     510           0 :                         MT_lock_set(&wlr_lock);
     511           0 :                         wlr_state = WLR_STOP;
     512           0 :                         MT_lock_unset(&wlr_lock);
     513           0 :                         break;
     514             :                 }
     515             :         }
     516           0 :         wlr_thread = 0;
     517           0 :         MT_lock_set(&wlr_lock);
     518           0 :         if( wlr_state == WLR_RUN)
     519           0 :                 wlr_state = WLR_WAIT;
     520           0 :         MT_lock_unset(&wlr_lock);
     521           0 :         MCcloseClient(cntxt);
     522             : }
     523             : 
     524             : // The replicate() command can be issued at the SQL console
     525             : // which can accept exceptions
     526             : str
     527           0 : WLRmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     528             : {
     529             :         int len;
     530             :         str msg = MAL_SUCCEED;
     531             : 
     532             :         (void) cntxt;
     533             :         (void) mb;
     534             : 
     535           0 :         len = snprintf(wlr_master, IDLENGTH, "%s", *getArgReference_str(stk, pci, 1));
     536           0 :         if (len == -1 || len >= IDLENGTH)
     537           0 :                 throw(MAL, "wlr.master", SQLSTATE(42000) "Input value is too large for wlr_master buffer");
     538           0 :         if ((msg = WLRgetMaster()))
     539           0 :                 freeException(msg);
     540           0 :         if ((msg = WLRgetConfig())) {
     541           0 :                 freeException(msg);
     542           0 :                 if ((msg = WLRputConfig()))
     543           0 :                         freeException(msg);
     544             :         }
     545             :         return MAL_SUCCEED;
     546             : }
     547             : 
     548             : str
     549           0 : WLRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     550             : {
     551           0 :         str msg, timelimit = NULL;
     552           0 :         size_t size = 0;
     553             :         lng limit = INT64_MAX;
     554             : 
     555           0 :         if( wlr_thread)
     556           0 :                 throw(MAL, "sql.replicate", "WLR thread already running, stop it before continueing");
     557             : 
     558           0 :         msg = WLRgetConfig();
     559           0 :         if( msg != MAL_SUCCEED)
     560             :                 return msg;
     561           0 :         if( wlr_error[0]) {
     562           0 :                 if (!(msg = GDKstrdup(wlr_error)))
     563           0 :                         throw(MAL, "sql.replicate", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     564             :                 return msg;
     565             :         }
     566             : 
     567           0 :         if( pci->argc == 0)
     568           0 :                 wlr_limit = INT64_MAX;
     569             :         else
     570           0 :         if( getArgType(mb, pci, 1) == TYPE_timestamp){
     571           0 :                 if (timestamp_precision_tostr(&timelimit, &size, *getArgReference_TYPE(stk, pci, 1, timestamp), 3, true) < 0)
     572           0 :                         throw(SQL, "wlr.replicate", GDK_EXCEPTION);
     573           0 :                 fprintf(stderr,"#time limit %s\n",timelimit);
     574             :         } else
     575           0 :         if( getArgType(mb, pci, 1) == TYPE_bte)
     576           0 :                 limit = getVarConstant(mb,getArg(pci,1)).val.btval;
     577             :         else
     578           0 :         if( getArgType(mb, pci, 1) == TYPE_sht)
     579           0 :                 limit = getVarConstant(mb,getArg(pci,1)).val.shval;
     580             :         else
     581           0 :         if( getArgType(mb, pci, 1) == TYPE_int)
     582           0 :                 limit = getVarConstant(mb,getArg(pci,1)).val.ival;
     583             :         else
     584           0 :         if( getArgType(mb, pci, 1) == TYPE_lng)
     585           0 :                 limit = getVarConstant(mb,getArg(pci,1)).val.lval;
     586             : 
     587           0 :         if (timelimit) {
     588           0 :                 if (size > sizeof(wlr_timelimit)) {
     589           0 :                         GDKfree(timelimit);
     590           0 :                         throw(MAL, "sql.replicate", "Limit timestamp size is too large");
     591             :                 }
     592           0 :                 strcpy(wlr_timelimit, timelimit);
     593           0 :                 GDKfree(timelimit);
     594             :         }
     595           0 :         if ( limit < 0 && wlr_timelimit[0] == 0)
     596           0 :                 throw(MAL, "sql.replicate", "Stop tag limit should be positive or timestamp should be set");
     597           0 :         if( wlc_tag == 0) {
     598           0 :                 if ((msg = WLRgetMaster()))
     599           0 :                         freeException(msg);
     600           0 :                 if( wlc_tag == 0)
     601           0 :                         throw(MAL, "sql.replicate", "Perhaps a missing wlr.master() call. ");
     602             :         }
     603           0 :         if (limit < INT64_MAX && limit >= wlc_tag)
     604           0 :                 throw(MAL, "sql.replicate", "Stop tag limit "LLFMT" be less than wlc_tag "LLFMT, limit, wlc_tag);
     605           0 :         if (limit >= 0)
     606           0 :                 wlr_limit = limit;
     607             : 
     608           0 :         if (wlc_state != WLC_CLONE)
     609           0 :                 throw(MAL, "sql.replicate", "No replication master set");
     610           0 :         if ((msg = WLRputConfig()))
     611             :                 return msg;
     612           0 :         return WLRprocessBatch(cntxt);
     613             : }
     614             : 
     615             : /* watch out, each log record can contain multiple transaction COMMIT/ROLLBACKs
     616             :  * This means the wlc_kind can not be set to the last one.
     617             :  */
     618             : str
     619           0 : WLRtransaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     620             : {       InstrPtr p;
     621             :         int i;
     622             : 
     623             :         (void) cntxt;
     624             :         (void) pci;
     625             :         (void) stk;
     626           0 :         cntxt->wlc_kind = 0;
     627           0 :         if( wlr_error[0]){
     628           0 :                 cntxt->wlc_kind = WLC_ERROR;
     629           0 :                 return MAL_SUCCEED;
     630             :         }
     631           0 :         for( i = mb->stop-1; cntxt->wlc_kind == 0 && i > 1; i--){
     632           0 :                 p = getInstrPtr(mb,i);
     633           0 :                 if( getModuleId(p) == wlrRef && getFunctionId(p)== commitRef)
     634           0 :                         cntxt->wlc_kind = WLC_COMMIT;
     635           0 :                 if( getModuleId(p) == wlrRef && getFunctionId(p)== rollbackRef)
     636           0 :                         cntxt->wlc_kind = WLC_ROLLBACK;
     637             :         }
     638             :         return MAL_SUCCEED;
     639             : }
     640             : 
     641             : 
     642             : /* Start a separate thread to continue merging the log record */
     643             : str
     644           0 : WLRstart(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     645             : {
     646             :         (void) cntxt;
     647             :         (void) mb;
     648             :         (void) stk;
     649             :         (void) pci;
     650             : 
     651             :         // time the consolidation process in the background
     652           0 :         if (MT_create_thread(&wlr_thread, WLRprocessScheduler, (void*) NULL,
     653             :                                                  MT_THR_DETACHED, "WLRprocessSched") < 0) {
     654           0 :                 throw(SQL,"wlr.init",SQLSTATE(42000) "Starting wlr manager failed");
     655             :         }
     656             : 
     657             :         // Wait until the replicator is properly initialized
     658           0 :         while( wlr_state != WLR_RUN && wlr_error[0] == 0){
     659           0 :                 MT_sleep_ms( 50);
     660             :         }
     661             :         return MAL_SUCCEED;
     662             : }
     663             : 
     664             : str
     665           0 : WLRstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     666             : {
     667             :         (void) cntxt;
     668             :         (void) mb;
     669             :         (void) stk;
     670             :         (void) pci;
     671             :         // kill the replicator thread and reset for a new one
     672           0 :         MT_lock_set(&wlr_lock);
     673           0 :         if( wlr_state == WLR_RUN)
     674           0 :                 wlr_state =  WLR_STOP;
     675           0 :         MT_lock_unset(&wlr_lock);
     676             : 
     677           0 :         return MAL_SUCCEED;
     678             : }
     679             : 
     680             : str
     681           0 : WLRgetmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     682             : {
     683           0 :         str *ret = getArgReference_str(stk,pci,0);
     684             :         str msg = MAL_SUCCEED;
     685             : 
     686             :         (void) cntxt;
     687             :         (void) mb;
     688             : 
     689           0 :         msg = WLRgetConfig();
     690           0 :         if( msg)
     691             :                 return msg;
     692           0 :         if( wlr_master[0]) {
     693           0 :                 if (!(*ret= GDKstrdup(wlr_master)))
     694           0 :                         throw(MAL, "wlr.getmaster", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     695             :         } else
     696           0 :                 throw(MAL, "wlr.getmaster", "Master not found");
     697             :         return msg;
     698             : }
     699             : 
     700             : str
     701           2 : WLRgetclock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     702             : {
     703           2 :         str *ret = getArgReference_str(stk,pci,0);
     704             :         str msg = MAL_SUCCEED;
     705             : 
     706             :         (void) cntxt;
     707             :         (void) mb;
     708             : 
     709           2 :         msg = WLRgetConfig();
     710           2 :         if( msg)
     711             :                 return msg;
     712           0 :         if( wlr_read[0])
     713           0 :                 *ret = GDKstrdup(wlr_read);
     714             :         else
     715           0 :                 *ret = GDKstrdup(str_nil);
     716           0 :         if (*ret == NULL)
     717           0 :                 throw(MAL, "wlr.getclock", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     718             :         return msg;
     719             : }
     720             : 
     721             : str
     722           1 : WLRgettick(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     723             : {
     724           1 :         lng *ret = getArgReference_lng(stk,pci,0);
     725             :         str msg = MAL_SUCCEED;
     726             : 
     727             :         (void) cntxt;
     728             :         (void) mb;
     729             : 
     730           1 :         msg = WLRgetConfig();
     731           1 :         if( msg)
     732             :                 return msg;
     733           0 :         *ret = wlr_tag;
     734           0 :         return msg;
     735             : }
     736             : 
     737             : /* the replica cycle can be set to fixed interval.
     738             :  * This allows for ensuring an up to date version every N seconds
     739             :  */
     740             : str
     741           0 : WLRsetbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     742             : {       int new;
     743             :         (void) cntxt;
     744             :         (void) mb;
     745           0 :         new = *getArgReference_int(stk,pci,1);
     746           0 :         if ( new < wlc_beat || new < 1)
     747           0 :                 throw(SQL,"setbeat",SQLSTATE(42000) "Cycle time should be larger then master or >= 1 second");
     748           0 :         wlr_beat = new;
     749           0 :         return MAL_SUCCEED;
     750             : }
     751             : 
     752             : static str
     753           0 : WLRquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     754           0 : {       str qry =  *getArgReference_str(stk,pci,1);
     755             :         str msg = MAL_SUCCEED;
     756             :         char *x, *y, *qtxt;
     757             : 
     758             :         (void) mb;
     759           0 :         if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
     760             :                 return msg;
     761             :         // execute the query in replay mode when required.
     762             :         // we need to get rid of the escaped quote.
     763           0 :         x = qtxt= (char*) GDKmalloc(strlen(qry) +1);
     764           0 :         if( qtxt == NULL)
     765           0 :                 throw(SQL,"wlr.query",SQLSTATE(HY013) MAL_MALLOC_FAIL);
     766           0 :         for(y = qry; *y; y++){
     767           0 :                 if( *y == '\\' ){
     768           0 :                         if( *(y+1) ==  '\'')
     769           0 :                         y += 1;
     770             :                 }
     771           0 :                 *x++ = *y;
     772             :         }
     773           0 :         *x = 0;
     774           0 :         msg =  SQLstatementIntern(cntxt, qtxt, "SQLstatement", TRUE, TRUE, NULL);
     775           0 :         GDKfree(qtxt);
     776           0 :         return msg;
     777             : }
     778             : 
     779             : /* An error was reported and manually dealt with.
     780             :  */
     781             : str
     782           0 : WLRaccept(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     783             : {
     784             :         (void) cntxt;
     785             :         (void) pci;
     786             :         (void) stk;
     787             :         (void) mb;
     788           0 :         wlr_error[0]= 0;
     789           0 :         return WLRputConfig();
     790             : }
     791             : 
     792             : str
     793           0 : WLRcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     794             : {
     795             :         (void) cntxt;
     796             :         (void) pci;
     797             :         (void) stk;
     798             :         (void) mb;
     799           0 :         return MAL_SUCCEED;
     800             : }
     801             : 
     802             : str
     803           0 : WLRrollback(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     804             : {
     805             :         (void) cntxt;
     806             :         (void) pci;
     807             :         (void) stk;
     808             :         (void) mb;
     809           0 :         return MAL_SUCCEED;
     810             : }
     811             : 
     812             : str
     813           0 : WLRaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     814             : {
     815             :         (void) cntxt;
     816             :         (void) pci;
     817             :         (void) stk;
     818             :         (void) mb;
     819           0 :         return MAL_SUCCEED;
     820             : }
     821             : 
     822             : str
     823           0 : WLRcatalog(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     824             : {
     825           0 :         return WLRquery(cntxt,mb,stk,pci);
     826             : }
     827             : 
     828             : str
     829           0 : WLRgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     830             : {
     831             :         // currently they are informative only
     832             :         (void) cntxt;
     833             :         (void) mb;
     834             :         (void) stk;
     835             :         (void) pci;
     836           0 :         return MAL_SUCCEED;
     837             : }
     838             : 
     839             : /* TODO: Martin take a look at this.
     840             :  *
     841             :  * PSA: DO NOT USE THIS OUT OF WLRappend or very bad things will happen!
     842             :  * (variable msg and tag cleanup will not be defined).
     843             :  */
     844             : #define WLRcolumn(TPE) \
     845             :         for( i = 6; i < pci->argc; i++){                                \
     846             :                 TPE val = *getArgReference_##TPE(stk,pci,i);            \
     847             :                 if (BUNappend(ins, (void*) &val, false) != GDK_SUCCEED) { \
     848             :                         msg = createException(MAL, "WLRappend", "BUNappend failed"); \
     849             :                         goto cleanup;                                   \
     850             :                 }                                                       \
     851             :         }
     852             : 
     853             : str
     854           0 : WLRappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     855             : {
     856             :         str sname, tname, cname;
     857             :         int tpe,i, log_res = LOG_OK;
     858           0 :         mvc *m=NULL;
     859             :         sql_schema *s;
     860             :         sql_table *t;
     861             :         sql_column *c;
     862             :         sql_idx *idx;
     863             :         BAT *ins = NULL, *pos = NULL;
     864             :         str msg = MAL_SUCCEED;
     865             :         BUN cnt = 1;
     866             : 
     867           0 :         if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
     868             :                 return msg;
     869           0 :         sname = *getArgReference_str(stk,pci,1);
     870           0 :         tname = *getArgReference_str(stk,pci,2);
     871           0 :         cname = *getArgReference_str(stk,pci,3);
     872           0 :         BUN offset = *(BUN*)getArgReference_oid(stk,pci,4);
     873           0 :         bat Pos = *getArgReference_bat(stk,pci,5);
     874             : 
     875           0 :         if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
     876             :                 return msg;
     877           0 :         if ((msg = checkSQLContext(cntxt)) != NULL)
     878             :                 return msg;
     879             : 
     880           0 :         s = mvc_bind_schema(m, sname);
     881           0 :         if (s == NULL)
     882           0 :                 throw(SQL, "sql.append", SQLSTATE(3F000) "Schema missing %s",sname);
     883           0 :         t = mvc_bind_table(m, s, tname);
     884           0 :         if (t == NULL)
     885           0 :                 throw(SQL, "sql.append", SQLSTATE(42S02) "Table missing %s.%s",sname,tname);
     886             : 
     887           0 :         if ((pos = BATdescriptor(Pos)) == NULL)
     888           0 :                 throw(SQL, "sql.append", SQLSTATE(HY005) "Cannot access column descriptor %s.%s.%s",
     889             :                         sname,tname,cname);
     890             :         // get the data into local BAT
     891             :         if (pos)
     892           0 :                 cnt = BATcount(pos);
     893             : 
     894           0 :         tpe= getArgType(mb,pci,7);
     895           0 :         ins = COLnew(0, tpe, 0, TRANSIENT);
     896           0 :         if( ins == NULL){
     897           0 :                 bat_destroy(pos);
     898           0 :                 throw(SQL,"WLRappend",SQLSTATE(HY013) MAL_MALLOC_FAIL);
     899             :         }
     900             : 
     901           0 :         sqlstore *store = m->session->tr->store;
     902           0 :         switch(ATOMstorage(tpe)){
     903           0 :         case TYPE_bit: WLRcolumn(bit); break;
     904           0 :         case TYPE_bte: WLRcolumn(bte); break;
     905           0 :         case TYPE_sht: WLRcolumn(sht); break;
     906           0 :         case TYPE_int: WLRcolumn(int); break;
     907           0 :         case TYPE_lng: WLRcolumn(lng); break;
     908           0 :         case TYPE_oid: WLRcolumn(oid); break;
     909           0 :         case TYPE_flt: WLRcolumn(flt); break;
     910           0 :         case TYPE_dbl: WLRcolumn(dbl); break;
     911             : #ifdef HAVE_HGE
     912           0 :         case TYPE_hge: WLRcolumn(hge); break;
     913             : #endif
     914             :         case TYPE_str:
     915           0 :                 for( i = 6; i < pci->argc; i++){
     916           0 :                         str val = *getArgReference_str(stk,pci,i);
     917           0 :                         if (BUNappend(ins, (void*) val, false) != GDK_SUCCEED) {
     918           0 :                                 msg = createException(MAL, "WLRappend", "BUNappend failed");
     919           0 :                                 goto cleanup;
     920             :                         }
     921             :                 }
     922             :                 break;
     923             :         }
     924             : 
     925           0 :         if (cname[0] != '%' && (c = mvc_bind_column(m, t, cname)) != NULL) {
     926           0 :                 log_res = store->storage_api.append_col(m->session->tr, c, offset, pos, ins, cnt, TYPE_bat);
     927           0 :         } else if (cname[0] == '%' && (idx = mvc_bind_idx(m, s, cname + 1)) != NULL) {
     928           0 :                 log_res = store->storage_api.append_idx(m->session->tr, idx, offset, pos, ins, cnt, tpe);
     929             :         }
     930           0 :         if (log_res != LOG_OK) /* the conflict case should never happen, but leave it here */
     931           0 :                 msg = createException(MAL, "WLRappend", SQLSTATE(42000) "Append failed%s", log_res == LOG_CONFLICT ? " due to conflict with another transaction" : "");
     932           0 : cleanup:
     933           0 :         bat_destroy(pos);
     934           0 :         BBPunfix(((BAT *) ins)->batCacheid);
     935           0 :         return msg;
     936             : }
     937             : 
     938             : str
     939           0 : WLRdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     940             : {
     941             :         str sname, tname;
     942             :         int i, log_res;
     943           0 :         mvc *m=NULL;
     944             :         sql_schema *s;
     945             :         sql_table *t;
     946             :         BAT *ins = 0;
     947             :         oid o;
     948             :         str msg= MAL_SUCCEED;
     949             : 
     950           0 :         if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
     951             :                 return msg;
     952           0 :         sname = *getArgReference_str(stk,pci,1);
     953           0 :         tname = *getArgReference_str(stk,pci,2);
     954             : 
     955           0 :         if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
     956             :                 return msg;
     957           0 :         if ((msg = checkSQLContext(cntxt)) != NULL)
     958             :                 return msg;
     959           0 :         sqlstore *store = m->session->tr->store;
     960             : 
     961           0 :         s = mvc_bind_schema(m, sname);
     962           0 :         if (s == NULL)
     963           0 :                 throw(SQL, "sql.append", SQLSTATE(3F000) "Schema missing %s",sname);
     964           0 :         t = mvc_bind_table(m, s, tname);
     965           0 :         if (t == NULL)
     966           0 :                 throw(SQL, "sql.append", SQLSTATE(42S02) "Table missing %s.%s",sname,tname);
     967             :         // get the data into local BAT
     968             : 
     969           0 :         ins = COLnew(0, TYPE_oid, 0, TRANSIENT);
     970           0 :         if( ins == NULL){
     971           0 :                 throw(SQL,"WLRappend",SQLSTATE(HY013) MAL_MALLOC_FAIL);
     972             :         }
     973             : 
     974           0 :         for( i = 3; i < pci->argc; i++){
     975           0 :                 o = *getArgReference_oid(stk,pci,i);
     976           0 :                 if (BUNappend(ins, (void*) &o, false) != GDK_SUCCEED) {
     977           0 :                         msg = createException(MAL, "WLRdelete", "BUNappend failed");
     978           0 :                         goto cleanup;
     979             :                 }
     980             :         }
     981             : 
     982           0 :         log_res = store->storage_api.delete_tab(m->session->tr, t, ins, TYPE_bat);
     983           0 :         if (log_res != LOG_OK)
     984           0 :                 msg = createException(MAL, "WLRdelete", SQLSTATE(42000) "Delete failed%s", log_res == LOG_CONFLICT ? " due to conflict with another transaction" : "");
     985           0 : cleanup:
     986           0 :         BBPunfix(((BAT *) ins)->batCacheid);
     987           0 :         return msg;
     988             : }
     989             : 
     990             : /* TODO: Martin take a look at this.
     991             :  *
     992             :  * PSA: DO NOT USE THIS OUT OF WLRupdate or very bad things will happen!
     993             :  * (variable msg and tag cleanup will not be defined).
     994             :  */
     995             : #define WLRvalue(TPE)                                                   \
     996             :         {       TPE val = *getArgReference_##TPE(stk,pci,5);                    \
     997             :                         if (BUNappend(upd, (void*) &val, false) != GDK_SUCCEED) {   \
     998             :                                 goto cleanup;                                           \
     999             :                 }                                                               \
    1000             :         }
    1001             : 
    1002             : str
    1003           0 : WLRupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1004             : {
    1005             :         str sname, tname, cname;
    1006           0 :         mvc *m=NULL;
    1007             :         sql_schema *s;
    1008             :         sql_table *t;
    1009             :         sql_column *c;
    1010             :         sql_idx *i;
    1011             :         BAT *upd = 0, *tids=0;
    1012             :         str msg= MAL_SUCCEED;
    1013             :         oid o;
    1014           0 :         int tpe = getArgType(mb,pci,5), log_res = LOG_OK;
    1015             : 
    1016           0 :         if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
    1017             :                 return msg;
    1018           0 :         sname = *getArgReference_str(stk,pci,1);
    1019           0 :         tname = *getArgReference_str(stk,pci,2);
    1020           0 :         cname = *getArgReference_str(stk,pci,3);
    1021           0 :         o = *getArgReference_oid(stk,pci,4);
    1022             : 
    1023           0 :         if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
    1024             :                 return msg;
    1025           0 :         if ((msg = checkSQLContext(cntxt)) != NULL)
    1026             :                 return msg;
    1027             : 
    1028           0 :         sqlstore *store = m->session->tr->store;
    1029             : 
    1030           0 :         s = mvc_bind_schema(m, sname);
    1031           0 :         if (s == NULL)
    1032           0 :                 throw(SQL, "sql.update", SQLSTATE(3F000) "Schema missing %s",sname);
    1033           0 :         t = mvc_bind_table(m, s, tname);
    1034           0 :         if (t == NULL)
    1035           0 :                 throw(SQL, "sql.update", SQLSTATE(42S02) "Table missing %s.%s",sname,tname);
    1036             :         // get the data into local BAT
    1037             : 
    1038           0 :         tids = COLnew(0, TYPE_oid, 0, TRANSIENT);
    1039           0 :         if( tids == NULL){
    1040           0 :                 throw(SQL,"WLRupdate",SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1041             :         }
    1042           0 :         upd = COLnew(0, tpe, 0, TRANSIENT);
    1043           0 :         if( upd == NULL){
    1044           0 :                 BBPunfix(((BAT *) tids)->batCacheid);
    1045           0 :                 throw(SQL,"WLRupdate",SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1046             :         }
    1047           0 :         if (BUNappend(tids, &o, false) != GDK_SUCCEED) {
    1048           0 :                 msg = createException(MAL, "WLRupdate", "BUNappend failed");
    1049           0 :                 goto cleanup;
    1050             :         }
    1051             : 
    1052           0 :         switch(ATOMstorage(tpe)){
    1053           0 :         case TYPE_bit: WLRvalue(bit); break;
    1054           0 :         case TYPE_bte: WLRvalue(bte); break;
    1055           0 :         case TYPE_sht: WLRvalue(sht); break;
    1056           0 :         case TYPE_int: WLRvalue(int); break;
    1057           0 :         case TYPE_lng: WLRvalue(lng); break;
    1058           0 :         case TYPE_oid: WLRvalue(oid); break;
    1059           0 :         case TYPE_flt: WLRvalue(flt); break;
    1060           0 :         case TYPE_dbl: WLRvalue(dbl); break;
    1061             : #ifdef HAVE_HGE
    1062           0 :         case TYPE_hge: WLRvalue(hge); break;
    1063             : #endif
    1064           0 :         case TYPE_str:
    1065             :                 {
    1066           0 :                         str val = *getArgReference_str(stk,pci,5);
    1067           0 :                         if (BUNappend(upd, (void*) val, false) != GDK_SUCCEED) {
    1068           0 :                                 msg = createException(MAL, "WLRupdate", "BUNappend failed");
    1069           0 :                                 goto cleanup;
    1070             :                         }
    1071             :                 }
    1072             :                 break;
    1073           0 :         default:
    1074           0 :                 TRC_ERROR(SQL_TRANS, "Missing type in WLRupdate\n");
    1075             :         }
    1076             : 
    1077           0 :         if (cname[0] != '%' && (c = mvc_bind_column(m, t, cname)) != NULL) {
    1078           0 :                 log_res = store->storage_api.update_col(m->session->tr, c, tids, upd, TYPE_bat);
    1079           0 :         } else if (cname[0] == '%' && (i = mvc_bind_idx(m, s, cname + 1)) != NULL) {
    1080           0 :                 log_res = store->storage_api.update_idx(m->session->tr, i, tids, upd, TYPE_bat);
    1081             :         }
    1082           0 :         if (log_res != LOG_OK)
    1083           0 :                 msg = createException(MAL, "WLRupdate", "Update failed%s", log_res == LOG_CONFLICT ? " due to conflict with another transaction" : "");
    1084           0 : cleanup:
    1085           0 :         BBPunfix(((BAT *) tids)->batCacheid);
    1086           0 :         BBPunfix(((BAT *) upd)->batCacheid);
    1087           0 :         return msg;
    1088             : }
    1089             : 
    1090             : str
    1091           0 : WLRclear_table(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1092             : {
    1093             :         sql_schema *s;
    1094             :         sql_table *t;
    1095           0 :         mvc *m = NULL;
    1096             :         str msg= MAL_SUCCEED;
    1097           0 :         str *sname = getArgReference_str(stk, pci, 1);
    1098           0 :         str *tname = getArgReference_str(stk, pci, 2);
    1099             :         BUN res;
    1100             : 
    1101           0 :         if( cntxt->wlc_kind == WLC_ROLLBACK || cntxt->wlc_kind == WLC_ERROR)
    1102             :                 return msg;
    1103           0 :         if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
    1104             :                 return msg;
    1105           0 :         if ((msg = checkSQLContext(cntxt)) != NULL)
    1106             :                 return msg;
    1107           0 :         s = mvc_bind_schema(m, *sname);
    1108           0 :         if (s == NULL)
    1109           0 :                 throw(SQL, "sql.clear_table", SQLSTATE(3F000) "Schema missing %s",*sname);
    1110           0 :         t = mvc_bind_table(m, s, *tname);
    1111           0 :         if (t == NULL)
    1112           0 :                 throw(SQL, "sql.clear_table", SQLSTATE(42S02) "Table missing %s.%s",*sname,*tname);
    1113           0 :         res = mvc_clear_table(m, t);
    1114           0 :         if (res >= BUN_NONE - 1)
    1115           0 :                 throw(SQL, "sql.clear_table", SQLSTATE(42000) "Table clear failed%s", res == (BUN_NONE - 1) ? " due to conflict with another transaction" : "");
    1116             :         return MAL_SUCCEED;
    1117             : }

Generated by: LCOV version 1.14