LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - wlc.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 257 397 64.7 %
Date: 2021-01-13 20:07:21 Functions: 25 33 75.8 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * (c) 2017 Martin Kersten
      11             :  * This module collects the workload-capture-replay statements during transaction execution,
      12             :  * also known as asynchronous logical replication management. It can be used for
      13             :  * multiple purposes: BACKUP, REPLICATION, and REPLAY
      14             :  *
      15             :  * For a BACKUP we need either a complete update log from the beginning, or
      16             :  * a binary snapshot with a collection of logs recording its changes since.
      17             :  * To ensure transaction ACID properties, the log record should be stored on
      18             :  * disk within the transaction brackets, which may cause a serious IO load.
      19             :  * (Tip, store these logs files on an SSD or NVM)
      20             :  *
      21             :  * For REPLICATION, also called a database clone or slave, we take a snapshot and the
      22             :  * log files that reflect the recent changes. The log updates are replayed against
      23             :  * the snapshot until a specific time point or transaction id is reached.
      24             :  *
      25             :  * Some systems also use the logical logs to REPLAY all (expensive) queries
      26             :  * against the database. We skip this for the time being, as those queries
      27             :  * can be captured already in the server.
      28             :  * [A flag should be added to at least capture them]
      29             :  *
      30             :  * The goal of this module is to ease BACKUP and REPLICATION of a master database
      31             :  * with a time-bounded delay. This means that both master and replica run at a certain beat
      32             :  * (in seconds) by which information is made available or read by the replica.
      33             :  *
      34             :  * Such a replica is used in query workload sharing, database versioning, and (re-)partitioning.
      35             :  * Tables taken from the master are not protected against local updates in the replica.
      36             :  * However, any replay transaction that fails stops the cloning process.
      37             :  * Furthermore, only persistent tables are considered for replication.
      38             :  * Updates under the 'tmp' schema, i.e. temporary tables, are ignored.
      39             :  *
      40             :  * Simplicity and ease of end-user control has been the driving argument here.
      41             :  *
      42             :  * IMPLEMENTATION
      43             :  * The underlying assumption of the techniques deployed is that the database
      44             :  * resides on a proper (global/distributed) file system to guarantees recovery
      45             :  * from most storage system related failures, e.g. using RAID disks or LSF systems.
      46             :  *
      47             :  * A database can be set into 'master' mode only once using the SQL command:
      48             :  * CALL wrc_master.master() whose access permission is limited to the 'monetdb' user.[CHECK]
      49             :  * An optional path to the log record directory can be given to reduce the IO latency,
      50             :  * e.g. using a nearby SSD, or where there is ample of space to keep a long history,
      51             :  * e.g. a HDD or cold storage location.
      52             :  *
      53             :  * By default, the command creates a directory .../dbfarm/dbname/wlc_logs to hold all logs
      54             :  * and a configuration file .../dbfarm/dbname/wlc.config to hold the state of the transaction logs.
      55             :  * It contains the following key=value pairs:
      56             :  *              snapshot=<path to a snapshot directory>
      57             :  *              logs=<path to the wlc log directory>
      58             :  *              state=<started, stopped>
      59             :  *              batches=<next available batch file to be applied>
      60             :  *              beat=<maximal delay between log files, in seconds>
      61             :  *              write=<timestamp of the last transaction recorded>
      62             :  *
      63             :  * A missing path to the snapshot denotes that we can start the clone with an empty database.
      64             :  * The log files are stored as master/<dbname>_<batchnumber>. They belong to the snapshot.
      65             :  *
      66             :  * Each wlc log file contains a serial log of a number of committed compound transactions.
      67             :  * The log records are represented as ordinary MAL statement blocks, which
      68             :  * are executed in serial mode. (parallelism can be considered for large updates later)
      69             :  * Each transaction job is identified by a unique id, its starting time, and the original responsible user.
      70             :  * Each log-record should end with a commit to be allowed for re-execution.
      71             :  * Log records with a rollback tag are merely for analysis by the DBA, their statements are ignored.
      72             :  *
      73             :  * A transaction log file is created by the master using a heartbeat (in seconds).
      74             :  * A new transaction log file is published when the system has been collecting transaction records for some time.
      75             :  * The beat can be set using a SQL command, e.g.
      76             :  * CALL wcr_master.beat(duration)
      77             :  * Setting it to zero leads to a log file per transaction and may cause a large log directory
      78             :  * with thousands of small files.
      79             :  * The default of 5 minutes should balance polling overhead in most practical situations.
      80             :  * Intermittent flush() during this period ensures the committed log records survive
      81             :  * a crash.
      82             :  *
      83             :  * A minor problem here is that we should ensure that the log file is closed even if there
      84             :  * are no transactions running. It is solved with a separate monitor thread, which ensures
      85             :  * that the a new log file is created at least after 'beat' seconds since the first logrecord was created.
      86             :  * After closing, the replicas can see from the master configuration file that a new log batch is available.
      87             :  *
      88             :  * The final step is to close stop transaction logging with the command
      89             :  * CALL wcr_master.stop().
      90             :  * It typically is the end-of-life-time for a snapshot. For example, when planning to do
      91             :  * a large bulk load of the database, stopping logging avoids a double write into the
      92             :  * database. The database can only be brought back into master mode using a fresh snapshot.
      93             :  *
      94             :  * [It is not advicable to temporarily stop logging and continue afterwards, because then there
      95             :  * is no guarantee the user will see a consistent database.]
      96             :  *
      97             :  * One of the key challenges for a DBA is to keep the log directory manageable, because it grows
      98             :  * with the speed up updates being applied to the database. This calls for regularly checking
      99             :  * for their disk footprint and taking a new snapshot as a frame of reference.
     100             :  *
     101             :  * [TODO A trigger should be added to stop logging and call for a fresh snapshot first]
     102             :  * [TODO the batch files might include the snapshot id for ease of rebuild]
     103             :  *
     104             :  * The DBA tool 'monetdb' provides options to create a master and its replicas.
     105             :  * It will also maintain the list of replicas for inspection and managing their drift.
     106             :  * For example,
     107             :  *       monetdb master <dbname> [ <optional snapshot path>]
     108             :  * which locks the database, takes a save copy, initializes the state chance to master.
     109             :  *
     110             :  * A fresh replica can be constructed as follows:
     111             :  *      monetdb replicate <dbname> <mastername>
     112             :  *
     113             :  * Instead of using the monetdb command line we can use the SQL calls directly
     114             :  * sys.master() and sys.replicate(), provided we start with a fresh database.
     115             :  *
     116             :  * CLONE
     117             :  *
     118             :  * Every clone should start off with a copy of the binary snapshot identified by 'snapshot'.
     119             :  * A fresh database can be turned into a clone using the call
     120             :  *     CALL wcr_replica.master('mastername')
     121             :  * It will grab the latest snapshot of the master and applies all
     122             :  * available log files before releasing the system.
     123             :  * The master has no knowledge about the number of clones and their whereabouts.
     124             :  *
     125             :  * The clone process will iterate in the background through the log files,
     126             :  * applying all update transactions.
     127             :  *
     128             :  * An optional timestamp or transaction id can be added to the replicate() command to
     129             :  * apply the logs until a given moment. This is particularly handy when an unexpected
     130             :  * desastrous user action (drop persistent table) has to be recovered from.
     131             :  *
     132             :  * CALL wcr_replica.master('mastername');  -- get logs from a specific master
     133             :  * ...
     134             :  * CALL wcr_replicate.replicate(tag); -- stops after we are in sink with tag
     135             :  * ...
     136             :  * CALL wcr_replicate.replicate(NOW()); -- stop after we sinked all transactions
     137             :  * ...
     138             :  * CALL wcr_replicate.replicate(); -- synchronize in background continuously
     139             :  * ...
     140             :  * CALL wcr_replicate.stop(); -- stop the synchroniation thread
     141             :  *
     142             :  * SELECT wcr_replica.clock();
     143             :  * returns the timestamp of the last replicated transaction.
     144             :  * SELECT wcr_replica.tick();
     145             :  * returns the transaction id of the last replicated transaction.
     146             :  * SELECT wcr_master.clock();
     147             :  * return the timestamp of the last committed transaction in the master.
     148             :  * SELECT wcr_master.tick();
     149             :  * return the transaction id of the last committed transaction in the master.
     150             :  *
     151             :  * Any failure encountered during a log replay terminates the replication process,
     152             :  * leaving a message in the merovingian log configuration.
     153             :  *
     154             :  * The wlc files purposely have a textual format derived from the MAL statements.
     155             :  * This provides a stepping stone for remote execution later.
     156             :  *
     157             :  * [TODO consider the roll logging of SQL session variables, i.e. optimizer_pipe
     158             :  * as part of the log record]
     159             :  * For updates we don't need special care for this.
     160             :  */
     161             : #include "monetdb_config.h"
     162             : #include <time.h>
     163             : #include "mal_builder.h"
     164             : #include "wlc.h"
     165             : #include "gdk_time.h"
     166             : 
     167             : MT_Lock     wlc_lock = MT_LOCK_INITIALIZER("wlc_lock");
     168             : 
     169             : static char wlc_snapshot[FILENAME_MAX]; // The location of the snapshot against which the logs work
     170             : static stream *wlc_fd = 0;
     171             : 
     172             : // These properties are needed by the replica to direct the roll-forward.
     173             : char wlc_dir[FILENAME_MAX];     // The location in the global file store for the logs
     174             : char wlc_name[IDLENGTH];        // The master database name
     175             : lng  wlc_tag = 0;                       // next transaction id
     176             : int  wlc_state = 0;                     // The current status of the logger in the life cycle
     177             : char wlc_write[26];                     // The timestamp of the last committed transaction
     178             : int  wlc_batches = 0;           // identifier of next batch
     179             : int  wlc_beat = 10;             // maximal period covered by a single log file in seconds
     180             : 
     181             : /* The database snapshots are binary copies of the dbfarm/database/bat
     182             :  * New snapshots are created currently using the 'monetdb snapshot <db>' command
     183             :  * or a SQL procedure.
     184             :  *
     185             :  * The wlc logs are stored in the snapshot directory as a time-stamped list
     186             :  */
     187             : 
     188             : int
     189      278960 : WLCused(void)
     190             : {
     191      278960 :         return wlc_dir[0] != 0;
     192             : }
     193             : 
     194             : /* The master configuration file is a simple key=value table */
     195             : str
     196          15 : WLCreadConfig(FILE *fd)
     197             : {
     198             :         str msg = MAL_SUCCEED;
     199             :         char path[FILENAME_MAX];
     200             :         int len;
     201             : 
     202         120 :         while( fgets(path, FILENAME_MAX, fd) ){
     203          90 :                 path[strlen(path)-1] = 0;
     204          90 :                 if( strncmp("logs=", path,5) == 0) {
     205          15 :                         len = snprintf(wlc_dir, FILENAME_MAX, "%s", path + 5);
     206          15 :                         if (len == -1 || len >= FILENAME_MAX) {
     207           0 :                                 msg = createException(MAL, "wlc.readConfig", "logs config value is too large");
     208           0 :                                 goto bailout;
     209             :                         }
     210             :                 }
     211          90 :                 if( strncmp("snapshot=", path,9) == 0) {
     212           0 :                         len = snprintf(wlc_snapshot, FILENAME_MAX, "%s", path + 9);
     213           0 :                         if (len == -1 || len >= FILENAME_MAX) {
     214           0 :                                 msg = createException(MAL, "wlc.readConfig", "snapshot config value is too large");
     215           0 :                                 goto bailout;
     216             :                         }
     217             :                 }
     218          90 :                 if( strncmp("tag=", path,4) == 0)
     219          15 :                         wlc_tag = atol(path+ 4);
     220          90 :                 if( strncmp("write=", path,6) == 0) {
     221          15 :                         len = snprintf(wlc_write, 26, "%s", path + 6);
     222          15 :                         if (len == -1 || len >= 26) {
     223           0 :                                 msg = createException(MAL, "wlc.readConfig", "write config value is too large");
     224           0 :                                 goto bailout;
     225             :                         }
     226             :                 }
     227          90 :                 if( strncmp("batches=", path, 8) == 0)
     228          15 :                         wlc_batches = atoi(path+ 8);
     229          90 :                 if( strncmp("beat=", path, 5) == 0)
     230          15 :                         wlc_beat = atoi(path+ 5);
     231          90 :                 if( strncmp("state=", path, 6) == 0)
     232          15 :                         wlc_state = atoi(path+ 6);
     233             :         }
     234          15 : bailout:
     235          15 :         fclose(fd);
     236          15 :         return msg;
     237             : }
     238             : 
     239             : str
     240           7 : WLCgetConfig(void){
     241             :         str l;
     242             :         FILE *fd;
     243             : 
     244           7 :         if((l = GDKfilepath(0,0,"wlc.config",0)) == NULL)
     245           0 :                 throw(MAL,"wlc.getConfig","Could not access wlc.config file\n");
     246           7 :         fd = fopen(l,"r");
     247           7 :         GDKfree(l);
     248           7 :         if( fd == NULL)
     249           0 :                 throw(MAL,"wlc.getConfig","Could not access wlc.config file\n");
     250           7 :         return WLCreadConfig(fd);
     251             : }
     252             : 
     253             : static
     254          28 : str WLCsetConfig(void){
     255             :         str path;
     256             :         stream *fd;
     257             : 
     258             :         /* be aware to be safe, on a failing fopen */
     259          28 :         if((path = GDKfilepath(0,0,"wlc.config",0)) == NULL)
     260           0 :                 throw(MAL,"wlc.setConfig","Could not access wlc.config\n");
     261          28 :         fd = open_wastream(path);
     262          28 :         GDKfree(path);
     263          28 :         if( fd == NULL)
     264           0 :                 throw(MAL,"wlc.setConfig","Could not access wlc.config: %s\n", mnstr_peek_error(NULL));
     265          28 :         if( wlc_snapshot[0] )
     266           0 :                 mnstr_printf(fd,"snapshot=%s\n", wlc_snapshot);
     267          28 :         mnstr_printf(fd,"logs=%s\n", wlc_dir);
     268          28 :         mnstr_printf(fd,"tag="LLFMT"\n", wlc_tag );
     269          28 :         mnstr_printf(fd,"write=%s\n", wlc_write );
     270          28 :         mnstr_printf(fd,"state=%d\n", wlc_state );
     271          28 :         mnstr_printf(fd,"batches=%d\n", wlc_batches );
     272          28 :         mnstr_printf(fd,"beat=%d\n", wlc_beat );
     273          28 :         (void) mnstr_flush(wlc_fd, MNSTR_FLUSH_DATA);
     274          28 :         (void) mnstr_fsync(wlc_fd);
     275          28 :         close_stream(fd);
     276          28 :         return MAL_SUCCEED;
     277             : }
     278             : 
     279             : // creation of the logger file and updating the configuration file should be atomic !!!
     280             : // The log files are marked with the database name. This allows for easy recognition later on.
     281             : static str
     282          13 : WLCsetlogger(void)
     283             : {
     284             :         int len;
     285             :         char path[FILENAME_MAX];
     286             :         str msg = MAL_SUCCEED;
     287             : 
     288          13 :         if( wlc_dir[0] == 0)
     289           0 :                 throw(MAL,"wlc.setlogger","Path not initalized");
     290          13 :         MT_lock_set(&wlc_lock);
     291          13 :         len = snprintf(path,FILENAME_MAX,"%s%c%s_%012d", wlc_dir, DIR_SEP, wlc_name, wlc_batches);
     292          13 :         if (len == -1 || len >= FILENAME_MAX) {
     293           0 :                 MT_lock_unset(&wlc_lock);
     294           0 :                 throw(MAL, "wlc.setlogger", "Logger filename path is too large");
     295             :         }
     296          13 :         wlc_fd = open_wastream(path);
     297          13 :         if( wlc_fd == 0){
     298           0 :                 MT_lock_unset(&wlc_lock);
     299           0 :                 throw(MAL,"wlc.logger","Could not create %s\n",path);
     300             :         }
     301             : 
     302          13 :         wlc_batches++;
     303          13 :         msg = WLCsetConfig();
     304          13 :         MT_lock_unset(&wlc_lock);
     305          13 :         return msg;
     306             : }
     307             : 
     308             : /* force the current log file to its storage container */
     309             : static str
     310         273 : WLCcloselogger(void)
     311             : {
     312         273 :         if( wlc_fd == NULL)
     313             :                 return MAL_SUCCEED;
     314          13 :         mnstr_flush(wlc_fd, MNSTR_FLUSH_DATA);
     315          13 :         mnstr_fsync(wlc_fd);
     316          13 :         close_stream(wlc_fd);
     317          13 :         wlc_fd= NULL;
     318          13 :         return WLCsetConfig();
     319             : }
     320             : 
     321             : /* force the current log file to its storage container, but dont create a new one yet */
     322             : str
     323           0 : WLCflush(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     324             : {
     325             :         (void) cntxt;
     326             :         (void) mb;
     327             :         (void) stk;
     328             :         (void) pci;
     329           0 :         if( wlc_fd == NULL)
     330             :                 return MAL_SUCCEED;
     331           0 :         mnstr_flush(wlc_fd, MNSTR_FLUSH_DATA);
     332           0 :         mnstr_fsync(wlc_fd);
     333           0 :         return WLCsetConfig();
     334             : }
     335             : 
     336             : str
     337         259 : WLCepilogue(void *ret)
     338             : {
     339             :         str msg = MAL_SUCCEED;
     340             : 
     341             :         (void)ret;
     342             : 
     343         259 :         MT_lock_set(&wlc_lock);
     344         259 :         msg = WLCcloselogger();
     345         259 :         wlc_snapshot[0]=0;
     346         259 :         wlc_dir[0]= 0;
     347         259 :         wlc_name[0]= 0;
     348         259 :         wlc_write[0] =0;
     349         259 :         MT_lock_unset(&wlc_lock);
     350             :         //TODO we have to return a possible error message somehow
     351         259 :         return(msg);
     352             : }
     353             : 
     354             : /*
     355             :  * The WLClogger process ensures that log files are properly closed
     356             :  * and released when their cycle time window has expired.
     357             :  */
     358             : 
     359             : static MT_Id wlc_logger;
     360             : 
     361             : static void
     362           7 : WLClogger(void *arg)
     363             : {
     364             :         int seconds;
     365             :         str msg = MAL_SUCCEED;
     366             : 
     367             :         (void) arg;
     368          14 :         while(!GDKexiting()){
     369           7 :                 if( wlc_dir[0] && wlc_fd ){
     370           0 :                         MT_lock_set(&wlc_lock);
     371           0 :                         if((msg = WLCcloselogger()) != MAL_SUCCEED) {
     372           0 :                                 TRC_ERROR(MAL_WLC, "%s\n", msg);
     373           0 :                                 freeException(msg);
     374             :                         }
     375           0 :                         MT_lock_unset(&wlc_lock);
     376             :                 }
     377          14 :                 for( seconds = 0; (wlc_beat == 0 || seconds < wlc_beat) && ! GDKexiting(); seconds++)
     378           7 :                         MT_sleep_ms( 1000);
     379             :         }
     380           7 : }
     381             : /*
     382             :  * The existence of the master directory should be checked upon server restart.
     383             :  * Then the master record information should be set and the WLClogger started.
     384             :  */
     385             : 
     386             : #ifndef F_OK
     387             : #define F_OK 0
     388             : #endif
     389             : #ifdef _MSC_VER
     390             : #define access(f, m)    _access(f, m)
     391             : #endif
     392             : 
     393             : str
     394         260 : WLCinit(void)
     395             : {
     396             :         str conf, msg;
     397             :         int len;
     398             : 
     399         260 :         if( wlc_state == WLC_STARTUP){
     400             :                 // use default location for master configuration file
     401         260 :                 if((conf = GDKfilepath(0,0,"wlc.config",0)) == NULL)
     402           0 :                         throw(MAL,"wlc.init","Could not access wlc.config\n");
     403             : 
     404         260 :                 if (access(conf, F_OK) ){
     405         253 :                         GDKfree(conf);
     406         253 :                         return MAL_SUCCEED;
     407             :                 }
     408           7 :                 GDKfree(conf);
     409             :                 // we are in master mode
     410           7 :                 len = snprintf(wlc_name, IDLENGTH, "%s", GDKgetenv("gdk_dbname"));
     411           7 :                 if (len == -1 || len >= IDLENGTH)
     412           0 :                         throw(MAL, "wlc.init", "gdk_dbname variable is too large");
     413             : 
     414           7 :                 if ((msg = WLCgetConfig()) != MAL_SUCCEED)
     415             :                         return msg;
     416           7 :                 if (MT_create_thread(&wlc_logger, WLClogger , (void*) 0,
     417             :                                                          MT_THR_DETACHED, "WLClogger") < 0) {
     418           0 :                         TRC_ERROR(MAL_WLC, "Thread could not be spawned\n");
     419             :                 }
     420             :         }
     421             :         return MAL_SUCCEED;
     422             : }
     423             : 
     424             : str
     425           0 : WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     426             : {
     427             :         (void) cntxt;
     428             :         (void) mb;
     429             :         (void) stk;
     430             :         (void) pci;
     431           0 :         return WLCinit();
     432             : }
     433             : 
     434             : str
     435           2 : WLCgetclock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     436           2 : {       str *ret = getArgReference_str(stk,pci,0);
     437             :         (void) cntxt;
     438             :         (void) mb;
     439           2 :         if( wlc_write[0])
     440           0 :                 *ret = GDKstrdup(wlc_write);
     441             :         else
     442           2 :                 *ret = GDKstrdup(str_nil);
     443           2 :         if(*ret == NULL)
     444           0 :                 throw(MAL,"wlc.getclock", MAL_MALLOC_FAIL);
     445             :         return MAL_SUCCEED;
     446             : }
     447             : 
     448             : str
     449           2 : WLCgettick(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     450           2 : {       lng *ret = getArgReference_lng(stk,pci,0);
     451             :         (void) cntxt;
     452             :         (void) mb;
     453           2 :         *ret = wlc_tag;
     454           2 :         return MAL_SUCCEED;
     455             : }
     456             : 
     457             : /* Changing the beat should have immediate effect
     458             :  * It forces a new log file
     459             :  */
     460             : str
     461           1 : WLCsetbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     462             : {       int beat;
     463             :         (void) mb;
     464             :         (void) cntxt;
     465           1 :         beat = * getArgReference_int(stk,pci,1);
     466           1 :         if ( beat < 0)
     467           0 :                 throw(MAL, "wlc.setbeat", "beat should be a positive number");
     468           1 :         wlc_beat = beat;
     469           1 :         return WLCcloselogger();
     470             : }
     471             : 
     472             : str
     473           0 : WLCgetbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     474           0 : {       int *ret = getArgReference_int(stk,pci,0);
     475             :         (void) mb;
     476             :         (void) cntxt;
     477           0 :         *ret = wlc_beat;
     478           0 :         return MAL_SUCCEED;
     479             : }
     480             : 
     481             : str
     482           1 : WLCmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     483             : {
     484             :         int len;
     485             :         char path[FILENAME_MAX];
     486             :         str l;
     487             : 
     488             :         (void) cntxt;
     489             :         (void) mb;
     490           1 :         if( wlc_state == WLC_STOP)
     491           0 :                 throw(MAL,"master","WARNING: logging has been stopped. Use new snapshot");
     492           1 :         if( wlc_state == WLC_RUN)
     493           0 :                 throw(MAL,"master","WARNING: already in master mode, call ignored");
     494           1 :         if( pci->argc == 2) {
     495           0 :                 len = snprintf(path, FILENAME_MAX, "%s", *getArgReference_str(stk, pci,1));
     496           0 :                 if (len == -1 || len >= FILENAME_MAX)
     497           0 :                         throw(MAL, "wlc.master", "wlc master filename path is too large");
     498             :         } else {
     499           1 :                 if((l = GDKfilepath(0,0,"wlc_logs",0)) == NULL)
     500           0 :                         throw(SQL,"wlc.master", MAL_MALLOC_FAIL);
     501           1 :                 len = snprintf(path,FILENAME_MAX,"%s%c",l, DIR_SEP);
     502           1 :                 GDKfree(l);
     503           1 :                 if (len == -1 || len >= FILENAME_MAX)
     504           0 :                         throw(MAL, "wlc.master", "wlc master filename path is too large");
     505             :         }
     506             :         // set location for logs
     507           1 :         if( GDKcreatedir(path) != GDK_SUCCEED)
     508           0 :                 throw(SQL,"wlc.master","Could not create %s\n", path);
     509           1 :         len = snprintf(wlc_name, IDLENGTH, "%s", GDKgetenv("gdk_dbname"));
     510           1 :         if (len == -1 || len >= IDLENGTH)
     511           0 :                 throw(SQL,"wlc.master","gdk_dbname is too large");
     512           1 :         len = snprintf(wlc_dir, FILENAME_MAX, "%s", path);
     513           1 :         if (len == -1 || len >= FILENAME_MAX)
     514           0 :                 throw(SQL,"wlc.master","wlc_dir directory name is too large");
     515           1 :         wlc_state= WLC_RUN;
     516           1 :         return WLCsetConfig();
     517             : }
     518             : 
     519             : str
     520           1 : WLCstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     521             : {
     522             :         (void) cntxt;
     523             :         (void) mb;
     524             :         (void) stk;
     525             :         (void) pci;
     526           1 :         if( wlc_state != WLC_RUN )
     527           0 :                 throw(MAL,"wlc.stop","WARNING: master role not active");
     528           1 :         wlc_state = WLC_STOP;
     529           1 :         return WLCsetConfig();
     530             : }
     531             : 
     532             : static str
     533             : WLCsettime(Client cntxt, InstrPtr pci, InstrPtr p, str fcn)
     534             : {
     535             :         timestamp ts = timestamp_current();
     536             :         str wlc_time = NULL;
     537             :         size_t wlc_limit = 0;
     538             :         InstrPtr ins;
     539             : 
     540             :         (void) pci;
     541             :         assert(!is_timestamp_nil(ts));
     542             :         if (timestamp_tostr(&wlc_time, &wlc_limit, &ts, true) < 0)
     543             :                 throw(MAL, fcn, "Unable to retrieve current time");
     544             :         ins = pushStr(cntxt->wlc, p, wlc_time);
     545             :         GDKfree(wlc_time);
     546             :         if (ins == NULL)
     547             :                 throw(MAL, fcn, MAL_MALLOC_FAIL);
     548             :         return MAL_SUCCEED;
     549             : }
     550             : 
     551             : /* Beware that a client context can be used in parallel and
     552             :  * that we don't want transaction interference caused by merging
     553             :  * the MAL instructions accidentally.
     554             :  * The effectively means that the SQL transaction record should
     555             :  * collect the MAL instructions and flush them.
     556             :  */
     557             : static str
     558          14 : WLCpreparewrite(Client cntxt)
     559             : {       str msg = MAL_SUCCEED;
     560             :         // save the wlc record on a file
     561             : 
     562          14 :         if( cntxt->wlc == 0 || cntxt->wlc->stop <= 1 ||  cntxt->wlc_kind == WLC_QUERY )
     563             :                 return MAL_SUCCEED;
     564             : 
     565          14 :         if( wlc_state != WLC_RUN){
     566           1 :                 trimMalVariables(cntxt->wlc, NULL);
     567           1 :                 resetMalBlk(cntxt->wlc, 0);
     568           1 :                 cntxt->wlc_kind = WLC_QUERY;
     569           1 :                 return MAL_SUCCEED;
     570             :         }
     571          13 :         if( wlc_dir[0] ){
     572          13 :                 if (wlc_fd == NULL){
     573          13 :                         msg = WLCsetlogger();
     574          13 :                         if( msg) {
     575             :                                 return msg;
     576             :                         }
     577             :                 }
     578             : 
     579          13 :                 MT_lock_set(&wlc_lock);
     580          13 :                 printFunction(wlc_fd, cntxt->wlc, 0, LIST_MAL_CALL );
     581          13 :                 (void) mnstr_flush(wlc_fd, MNSTR_FLUSH_DATA);
     582          13 :                 (void) mnstr_fsync(wlc_fd);
     583             :                 // close file if no delay is allowed
     584          13 :                 if( wlc_beat == 0 )
     585          13 :                         msg = WLCcloselogger();
     586             : 
     587          13 :                 MT_lock_unset(&wlc_lock);
     588          13 :                 trimMalVariables(cntxt->wlc, NULL);
     589          13 :                 resetMalBlk(cntxt->wlc, 0);
     590          13 :                 cntxt->wlc_kind = WLC_QUERY;
     591             :         } else
     592           0 :                 throw(MAL,"wlc.write","WLC log path missing ");
     593             : 
     594          13 :         if( wlc_state == WLC_STOP)
     595           0 :                 throw(MAL,"wlc.write","Logging for this snapshot has been stopped. Use a new snapshot to continue logging.");
     596             :         return msg;
     597             : }
     598             : 
     599             : static str
     600          30 : WLCstart(Client cntxt, str fcn)
     601             : {
     602             :         InstrPtr pci;
     603             :         str msg = MAL_SUCCEED;
     604          30 :         MalBlkPtr mb = cntxt->wlc;
     605             :         lng tag;
     606             : 
     607          30 :         if( cntxt->wlc == NULL){
     608           8 :                 if((cntxt->wlc = newMalBlk(STMT_INCREMENT)) == NULL)
     609           0 :                         throw(MAL, fcn, MAL_MALLOC_FAIL);
     610             :                 mb = cntxt->wlc;
     611             :         }
     612             :         /* Find a single transaction sequence ending with COMMIT or ROLLBACK */
     613          30 :         if( mb->stop > 1 ){
     614          16 :                 pci = getInstrPtr(mb, mb->stop -1 );
     615          16 :                 if (!(strcmp( getFunctionId(pci), "commit") == 0 || strcmp( getFunctionId(pci), "rollback") == 0))
     616             :                         return MAL_SUCCEED;
     617             :         }
     618             : 
     619             :         /* create the start of a new transaction block */
     620          14 :         MT_lock_set(&wlc_lock);
     621          14 :         tag = wlc_tag;
     622          14 :         wlc_tag++; // Update wlc administration
     623             : 
     624          14 :         pci = newStmt(mb,"wlr", "transaction");
     625          14 :         pci = pushLng(mb, pci, tag);
     626          14 :         if((msg = WLCsettime(cntxt,pci, pci, fcn)) == MAL_SUCCEED) {
     627          14 :                 snprintf(wlc_write, 26, "%s", getVarConstant(cntxt->wlc, getArg(pci, 2)).val.sval);
     628          14 :                 pci = pushStr(mb, pci, cntxt->username);
     629          14 :                 pci->ticks = GDKms();
     630             :         }
     631          14 :         MT_lock_unset(&wlc_lock);
     632             : 
     633          14 :         return msg;
     634             : }
     635             : 
     636             : str
     637           0 : WLCtransaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     638             : {
     639             :         (void) cntxt;
     640             :         (void) mb;
     641             :         (void) stk;
     642             :         (void) pci;
     643             : 
     644           0 :         return MAL_SUCCEED;
     645             : }
     646             : 
     647             : str
     648           0 : WLCquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     649             : {
     650             :         InstrPtr p;
     651             :         str msg = MAL_SUCCEED;
     652             : 
     653             :         (void) stk;
     654           0 :         if ( strcmp("-- no query",getVarConstant(mb, getArg(pci,1)).val.sval) == 0)
     655             :                 return MAL_SUCCEED;     // ignore system internal queries.
     656           0 :         msg = WLCstart(cntxt, "wlr.query");
     657           0 :         if(msg)
     658             :                 return msg;
     659           0 :         cntxt->wlc_kind = WLC_QUERY;
     660           0 :         p = newStmt(cntxt->wlc, "wlr","query");
     661           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     662           0 :         return msg;
     663             : }
     664             : 
     665             : str
     666           4 : WLCcatalog(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     667             : {
     668             :         InstrPtr p;
     669             :         str msg = MAL_SUCCEED;
     670             : 
     671             :         (void) stk;
     672           4 :         msg = WLCstart(cntxt, "wlr.catalog");
     673           4 :         if(msg)
     674             :                 return msg;
     675           4 :         cntxt->wlc_kind = WLC_CATALOG;
     676           4 :         p = newStmt(cntxt->wlc, "wlr","catalog");
     677           4 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     678           4 :         return msg;
     679             : }
     680             : 
     681             : str
     682          10 : WLCaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     683             : {
     684             :         InstrPtr p;
     685             :         str msg = MAL_SUCCEED;
     686             : 
     687             :         (void) stk;
     688          10 :         msg = WLCstart(cntxt, "wlr.action");
     689          10 :         if(msg)
     690             :                 return msg;
     691          10 :         cntxt->wlc_kind = WLC_UPDATE;
     692          10 :         p = newStmt(cntxt->wlc, "wlr","action");
     693          10 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     694          10 :         return msg;
     695             : }
     696             : 
     697             : /*
     698             :  * We actually don't need the catalog operations in the log.
     699             :  * It is sufficient to upgrade the replay block to WLR_CATALOG.
     700             :  */
     701             : str
     702           0 : WLCgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     703             : {
     704             :         InstrPtr p;
     705             :         int i, k,  tpe, varid;
     706             :         str msg = MAL_SUCCEED;
     707             : 
     708             :         (void) stk;
     709           0 :         msg = WLCstart(cntxt, "wlr.generic");
     710           0 :         if(msg)
     711             :                 return msg;
     712           0 :         cntxt->wlc_kind = WLC_IGNORE;
     713           0 :         p = newInstruction(cntxt->wlc, "wlr",getFunctionId(pci));
     714           0 :         k = newTmpVariable(mb,TYPE_any);
     715           0 :         if( k >= 0)
     716           0 :                 getArg(p,0) =  k;
     717           0 :         for( i = pci->retc; i< pci->argc; i++){
     718           0 :                 tpe =getArgType(mb, pci, i);
     719           0 :                 switch(tpe){
     720           0 :                 case TYPE_str:
     721           0 :                         k = defConstant(mb,TYPE_str,&getVarConstant(mb, getArg(pci, i)));
     722           0 :                         if( k >= 0)
     723           0 :                                 p = addArgument(cntxt->wlc, p, k);
     724             :                         break;
     725           0 :                 default:
     726           0 :                         varid = defConstant(cntxt->wlc, tpe, getArgReference(stk, pci, i));
     727           0 :                         if( varid >= 0)
     728           0 :                                 p = addArgument(cntxt->wlc, p, varid);
     729             :                 }
     730             :         }
     731           0 :         p->ticks = GDKms();
     732           0 :         pushInstruction(mb,p);
     733           0 :         cntxt->wlc_kind = WLC_CATALOG;
     734           0 :         return  msg;
     735             : }
     736             : 
     737             : #define bulk(TPE1, TPE2)\
     738             : {       TPE1 *p = (TPE1 *) Tloc(b,0);\
     739             :         TPE1 *q = (TPE1 *) Tloc(b, BUNlast(b));\
     740             :         int k=0; \
     741             :         for( ; p < q; p++, k++){\
     742             :                 if( k % 32 == 31){\
     743             :                         pci = newStmt(cntxt->wlc, "wlr",getFunctionId(pci));\
     744             :                         pci = pushStr(cntxt->wlc, pci, sch);\
     745             :                         pci = pushStr(cntxt->wlc, pci, tbl);\
     746             :                         pci = pushStr(cntxt->wlc, pci, col);\
     747             :                         pci->ticks = GDKms();\
     748             :                 }\
     749             :                 pci = push##TPE2(cntxt->wlc, pci ,*p);\
     750             : } }
     751             : 
     752             : #define updateBatch(TPE1,TPE2)\
     753             : {       TPE1 *x = (TPE1 *) Tloc(bval,0);\
     754             :         TPE1 *y = (TPE1 *) Tloc(bval, BUNlast(b));\
     755             :         int k=0; \
     756             :         for( ; x < y; x++, k++){\
     757             :                 p = newStmt(cntxt->wlc, "wlr","update");\
     758             :                 p = pushStr(cntxt->wlc, p, sch);\
     759             :                 p = pushStr(cntxt->wlc, p, tbl);\
     760             :                 p = pushStr(cntxt->wlc, p, col);\
     761             :                 p = pushOid(cntxt->wlc, p,  (ol? *ol++: o++));\
     762             :                 p = push##TPE2(cntxt->wlc, p ,*x);\
     763             : } }
     764             : 
     765             : static str
     766             : WLCdatashipping(Client cntxt, MalBlkPtr mb, InstrPtr pci, int bid)
     767             : {       BAT *b;
     768             :         str sch, tbl, col;
     769             :         str msg = MAL_SUCCEED;
     770             :         (void) mb;
     771             : 
     772             :         b = BATdescriptor(bid);
     773             :         if (b == NULL) {
     774             :                 throw(MAL, "wlc.datashipping", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     775             :         }
     776             : 
     777             : // large BATs can also be re-created using the query.
     778             : // Copy into should always be expanded, because the source may not
     779             : // be accessible in the replica. TODO
     780             : 
     781             :         sch = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,1)).val.sval);
     782             :         tbl = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,2)).val.sval);
     783             :         col = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,3)).val.sval);
     784             :         if(!sch || !tbl || !col) {
     785             :                 msg = createException(MAL, "wlc.datashipping", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     786             :                 goto finish;
     787             :         }
     788             :         if (cntxt->wlc_kind < WLC_UPDATE)
     789             :                 cntxt->wlc_kind = WLC_UPDATE;
     790             :         switch( ATOMstorage(b->ttype)){
     791             :         case TYPE_bit: bulk(bit,Bit); break;
     792             :         case TYPE_bte: bulk(bte,Bte); break;
     793             :         case TYPE_sht: bulk(sht,Sht); break;
     794             :         case TYPE_int: bulk(int,Int); break;
     795             :         case TYPE_lng: bulk(lng,Lng); break;
     796             :         case TYPE_flt: bulk(flt,Flt); break;
     797             :         case TYPE_dbl: bulk(dbl,Dbl); break;
     798             : #ifdef HAVE_HGE
     799             :         case TYPE_hge: bulk(hge,Hge); break;
     800             : #endif
     801             :         case TYPE_str:
     802             :                 {       BATiter bi;
     803             :                         BUN p,q;
     804             :                         int k=0;
     805             :                         bi= bat_iterator(b);
     806             :                         BATloop(b,p,q){
     807             :                                 if( k % 32 == 31){
     808             :                                         pci = newStmt(cntxt->wlc, "wlr",getFunctionId(pci));
     809             :                                         pci = pushStr(cntxt->wlc, pci, sch);
     810             :                                         pci = pushStr(cntxt->wlc, pci, tbl);
     811             :                                         pci = pushStr(cntxt->wlc, pci, col);
     812             :                                 }
     813             :                                 k++;
     814             :                                 pci = pushStr(cntxt->wlc, pci ,(str) BUNtvar(bi,p));
     815             :                 } }
     816             :                 break;
     817             :         default:
     818             :                 TRC_ERROR(MAL_WLC, "Non-supported type: %d\n", ATOMstorage(b->ttype));
     819             :                 cntxt->wlc_kind = WLC_CATALOG;
     820             :         }
     821             : finish:
     822             :         BBPunfix(b->batCacheid);
     823             :         if (sch)
     824             :                 GDKfree(sch);
     825             :         if (tbl)
     826             :                 GDKfree(tbl);
     827             :         if (col)
     828             :                 GDKfree(col);
     829             :         return msg;
     830             : }
     831             : 
     832             : str
     833          12 : WLCappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     834             : {
     835             :         InstrPtr p;
     836             :         int tpe, varid;
     837             :         str msg = MAL_SUCCEED;
     838             : 
     839             :         (void) stk;
     840             :         (void) mb;
     841          12 :         msg = WLCstart(cntxt, "wlr.append");
     842          12 :         if(msg)
     843             :                 return msg;
     844          12 :         p = newStmt(cntxt->wlc, "wlr","append");
     845          12 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     846          12 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
     847          12 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,3)).val.sval);
     848             : 
     849             :         // extend the instructions with all values.
     850             :         // If this become too large we can always switch to a "catalog" mode
     851             :         // forcing re-execution instead
     852          12 :         tpe= getArgType(mb,pci,4);
     853          12 :         if (isaBatType(tpe) ){
     854             :                 // actually check the size of the BAT first, most have few elements
     855          10 :                 msg = WLCdatashipping(cntxt, mb, p, stk->stk[getArg(pci,4)].val.bval);
     856             :         } else {
     857             :                 ValRecord cst;
     858           2 :                 if (VALcopy(&cst, getArgReference(stk,pci,4)) != NULL){
     859           2 :                         varid = defConstant(cntxt->wlc, tpe, &cst);
     860           2 :                         if( varid >=0)
     861           2 :                                 p = pushArgument(cntxt->wlc, p, varid);
     862             :                 }
     863             :         }
     864          12 :         if( cntxt->wlc_kind < WLC_UPDATE)
     865           0 :                 cntxt->wlc_kind = WLC_UPDATE;
     866             : 
     867             :         return msg;
     868             : }
     869             : 
     870             : /* check for empty BATs first */
     871             : str
     872           1 : WLCdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     873             : {       InstrPtr p;
     874             :         int tpe, k = 0;
     875           1 :         int bid =  stk->stk[getArg(pci,3)].val.bval;
     876             :         oid o=0, last, *ol;
     877             :         BAT *b;
     878             :         str msg = MAL_SUCCEED;
     879             : 
     880             :         (void) stk;
     881             :         (void) mb;
     882           1 :         b= BBPquickdesc(bid, false);
     883           1 :         if( BATcount(b) == 0)
     884             :                 return MAL_SUCCEED;
     885           1 :         msg = WLCstart(cntxt, "wlr.delete");
     886           1 :         if(msg) {
     887           0 :                 BBPunfix(b->batCacheid);
     888           0 :                 return msg;
     889             :         }
     890           1 :         cntxt->wlc_kind = WLC_UPDATE;
     891           1 :         p = newStmt(cntxt->wlc, "wlr","delete");
     892           1 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     893           1 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
     894             : 
     895           1 :         tpe= getArgType(mb,pci,3);
     896           1 :         if (isaBatType(tpe) ){
     897           1 :                 b= BATdescriptor(bid);
     898           1 :                 if (b == NULL)
     899           0 :                         throw(MAL, "wlc.delete", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     900           1 :                 o = b->tseqbase;
     901           1 :                 last = o + BATcount(b);
     902           1 :                 if( b->ttype == TYPE_void){
     903           4 :                         for( ; o < last; o++, k++){
     904           3 :                                 if( k % 32 == 31){
     905           0 :                                         p = newStmt(cntxt->wlc, "wlr","delete");
     906           0 :                                         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     907           0 :                                         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
     908             :                                 }
     909           3 :                                 p = pushOid(cntxt->wlc,p, o);
     910             :                         }
     911             :                 } else {
     912           0 :                         ol = (oid*) Tloc(b,0);
     913           0 :                         for( ; o < last; o++, k++, ol++){
     914           0 :                                 if( k % 32 == 31){
     915           0 :                                         p = newStmt(cntxt->wlc, "wlr","delete");
     916           0 :                                         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     917           0 :                                         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
     918             :                                 }
     919           0 :                                 p = pushOid(cntxt->wlc,p, *ol);
     920             :                         }
     921             :                 }
     922           1 :                 BBPunfix(b->batCacheid);
     923             :         } else
     924           0 :                 throw(MAL,"wlc.delete","BAT expected");
     925           1 :         if( cntxt->wlc_kind < WLC_UPDATE)
     926           0 :                 cntxt->wlc_kind = WLC_UPDATE;
     927             : 
     928             :         return msg;
     929             : }
     930             : 
     931             : str
     932           2 : WLCupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     933             : {       InstrPtr p;
     934             :         str sch,tbl,col, msg = MAL_SUCCEED;
     935             :         ValRecord cst;
     936             :         int tpe, varid;
     937             :         oid o = 0, *ol = 0;
     938             : 
     939           2 :         sch = *getArgReference_str(stk,pci,1);
     940           2 :         tbl = *getArgReference_str(stk,pci,2);
     941           2 :         col = *getArgReference_str(stk,pci,3);
     942           2 :         msg = WLCstart(cntxt, "wlr.update");
     943           2 :         if(msg)
     944             :                 return msg;
     945           2 :         cntxt->wlc_kind = WLC_UPDATE;
     946           2 :         tpe= getArgType(mb,pci,5);
     947           4 :         if (isaBatType(tpe) ){
     948             :                 BAT *b, *bval;
     949           2 :                 b= BATdescriptor(stk->stk[getArg(pci,4)].val.bval);
     950           2 :                 bval= BATdescriptor(stk->stk[getArg(pci,5)].val.bval);
     951           2 :                 if(b == NULL || bval == NULL) {
     952           0 :                         if(b)
     953           0 :                                 BBPunfix(b->batCacheid);
     954           0 :                         if(bval)
     955           0 :                                 BBPunfix(bval->batCacheid);
     956           0 :                         throw(MAL, "wlr.update", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     957             :                 }
     958           2 :                 if( b->ttype == TYPE_void)
     959           2 :                         o = b->tseqbase;
     960             :                 else
     961           0 :                         ol = (oid*) Tloc(b,0);
     962           2 :                 switch( ATOMstorage(bval->ttype)){
     963           0 :                 case TYPE_bit: updateBatch(bit,Bit); break;
     964           0 :                 case TYPE_bte: updateBatch(bte,Bte); break;
     965           0 :                 case TYPE_sht: updateBatch(sht,Sht); break;
     966           2 :                 case TYPE_int: updateBatch(int,Int); break;
     967           0 :                 case TYPE_lng: updateBatch(lng,Lng); break;
     968           0 :                 case TYPE_flt: updateBatch(flt,Flt); break;
     969           0 :                 case TYPE_dbl: updateBatch(dbl,Dbl); break;
     970             : #ifdef HAVE_HGE
     971           0 :                 case TYPE_hge: updateBatch(hge,Hge); break;
     972             : #endif
     973           1 :                 case TYPE_str:
     974             :                 {       BATiter bi;
     975             :                         int k=0;
     976             :                         BUN x,y;
     977             :                         bi = bat_iterator(bval);
     978           7 :                         BATloop(bval,x,y){
     979           6 :                                 p = newStmt(cntxt->wlc, "wlr","update");
     980           6 :                                 p = pushStr(cntxt->wlc, p, sch);
     981           6 :                                 p = pushStr(cntxt->wlc, p, tbl);
     982           6 :                                 p = pushStr(cntxt->wlc, p, col);
     983           6 :                                 p = pushOid(cntxt->wlc, p, (ol? *ol++ : o++));
     984           6 :                                 p = pushStr(cntxt->wlc, p , BUNtvar(bi,x));
     985             :                                 k++;
     986             :                 } }
     987             :                 /* fall through */
     988           1 :                 default:
     989           1 :                         cntxt->wlc_kind = WLC_CATALOG;
     990             :                 }
     991           2 :                 BBPunfix(b->batCacheid);
     992             :         } else {
     993           0 :                 p = newStmt(cntxt->wlc, "wlr","update");
     994           0 :                 p = pushStr(cntxt->wlc, p, sch);
     995           0 :                 p = pushStr(cntxt->wlc, p, tbl);
     996           0 :                 p = pushStr(cntxt->wlc, p, col);
     997           0 :                 o = *getArgReference_oid(stk,pci,4);
     998           0 :                 p = pushOid(cntxt->wlc,p, o);
     999           0 :                 if (VALcopy(&cst, getArgReference(stk,pci,5)) != NULL){
    1000           0 :                         varid = defConstant(cntxt->wlc, tpe, &cst);
    1001           0 :                         if( varid >= 0)
    1002           0 :                                 p = pushArgument(cntxt->wlc, p, varid);
    1003             :                 }
    1004             :         }
    1005             : 
    1006           2 :         if( cntxt->wlc_kind < WLC_UPDATE)
    1007           0 :                 cntxt->wlc_kind = WLC_UPDATE;
    1008             :         return msg;
    1009             : }
    1010             : 
    1011             : str
    1012           1 : WLCclear_table(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1013             : {
    1014             :         InstrPtr p;
    1015             :         str msg = MAL_SUCCEED;
    1016             :         (void) stk;
    1017           1 :         msg = WLCstart(cntxt, "wlr.clear_table");
    1018           1 :         if(msg)
    1019             :                 return msg;
    1020           1 :         cntxt->wlc_kind = WLC_UPDATE;
    1021           1 :         p = newStmt(cntxt->wlc, "wlr","clear_table");
    1022           1 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
    1023           1 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
    1024           1 :         if( cntxt->wlc_kind < WLC_UPDATE)
    1025           0 :                 cntxt->wlc_kind = WLC_UPDATE;
    1026             : 
    1027             :         return msg;
    1028             : }
    1029             : 
    1030             : str
    1031       64040 : WLCcommit(int clientid)
    1032             : {
    1033       64040 :         if( mal_clients[clientid].wlc && mal_clients[clientid].wlc->stop > 1){
    1034          14 :                 newStmt(mal_clients[clientid].wlc,"wlr","commit");
    1035          14 :                 return WLCpreparewrite( &mal_clients[clientid]);
    1036             :         }
    1037             :         return MAL_SUCCEED;
    1038             : }
    1039             : 
    1040             : str
    1041           0 : WLCcommitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1042             : {       str msg = MAL_SUCCEED;
    1043           0 :         msg = WLCstart(cntxt, "wlr.commit");
    1044           0 :         if(msg)
    1045             :                 return msg;
    1046             :         (void) mb;
    1047             :         (void) stk;
    1048             :         (void) pci;
    1049           0 :         cntxt->wlc_kind = WLC_UPDATE;
    1050           0 :         return WLCcommit(cntxt->idx);
    1051             : }
    1052             : 
    1053             : str
    1054       10382 : WLCrollback(int clientid)
    1055             : {
    1056       10382 :         if( mal_clients[clientid].wlc){
    1057           0 :                 newStmt(mal_clients[clientid].wlc,"wlr","rollback");
    1058           0 :                 return WLCpreparewrite( &mal_clients[clientid]);
    1059             :         }
    1060             :         return MAL_SUCCEED;
    1061             : }
    1062             : 
    1063             : str
    1064           0 : WLCrollbackCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1065             : {       str msg = MAL_SUCCEED;
    1066           0 :         msg = WLCstart(cntxt, "wlr.rollback");
    1067           0 :         if(msg)
    1068             :                 return msg;
    1069             :         (void) mb;
    1070             :         (void) stk;
    1071             :         (void) pci;
    1072           0 :         cntxt->wlc_kind = WLC_UPDATE;
    1073           0 :         return WLCrollback(cntxt->idx);
    1074             : }
    1075             : 
    1076             : #include "mel.h"
    1077             : mel_func wlc_init_funcs[] = {
    1078             :  pattern("wlc", "init", WLCinitCmd, false, "Test for running as master", noargs),
    1079             :  command("wlc", "epilogue", WLCepilogue, false, "release the resources held by the wlc module", args(1,1, arg("",void))),
    1080             :  pattern("wlc", "master", WLCmaster, false, "Activate the workload-capture-replay process", noargs),
    1081             :  pattern("wlc", "master", WLCmaster, false, "Activate the workload-capture-replay process. Use a different location for the logs.", args(0,1, arg("path",str))),
    1082             :  pattern("wlc", "stop", WLCstop, false, "Stop capturing the logs", noargs),
    1083             :  pattern("wlc", "flush", WLCflush, false, "Flush current log buffer", noargs),
    1084             :  pattern("wlc", "setbeat", WLCsetbeat, false, "Maximal delay for transaction log flushing", args(0,1, arg("duration",int))),
    1085             :  pattern("wlc", "getbeat", WLCgetbeat, false, "Maximal delay for transaction log flushing", args(1,2, arg("",str),arg("duration",int))),
    1086             :  pattern("wlc", "getclock", WLCgetclock, false, "Timestamp of last update transaction", args(1,1, arg("",str))),
    1087             :  pattern("wlc", "gettick", WLCgettick, false, "Transaction identifier of the last committed transaction", args(1,1, arg("",lng))),
    1088             :  pattern("wlc", "rollback", WLCrollbackCmd, false, "Mark the end of the work unit", noargs),
    1089             :  pattern("wlc", "commit", WLCcommitCmd, false, "Mark the end of the work unit", noargs),
    1090             :  pattern("wlc", "query", WLCquery, false, "Keep the queries for replay.", args(0,1, arg("q",str))),
    1091             :  pattern("wlc", "catalog", WLCcatalog, false, "Keep the catalog changing queries for replay. ", args(0,1, arg("q",str))),
    1092             :  pattern("wlc", "action", WLCaction, false, "Keep the database changing queries for replay. ", args(0,1, arg("q",str))),
    1093             :  pattern("wlc", "append", WLCappend, false, "Keep the insertions in the workload-capture-replay list", args(1,5, arg("",int),arg("sname",str),arg("tname",str),arg("cname",str),argany("ins",0))),
    1094             :  pattern("wlc", "update", WLCupdate, false, "Keep the update in the workload-capture-replay list", args(1,6, arg("",int),arg("sname",str),arg("tname",str),arg("cname",str),argany("tid",0),argany("val",0))),
    1095             :  pattern("wlc", "delete", WLCdelete, false, "Keep the deletions in the workload-capture-replay list", args(1,4, arg("",int),arg("sname",str),arg("tname",str),argany("b",0))),
    1096             :  pattern("wlc", "clear_table", WLCclear_table, false, "Keep the deletions in the workload-capture-replay list", args(1,3, arg("",int),arg("sname",str),arg("tname",str))),
    1097             :  pattern("wlc", "commit", WLCcommitCmd, false, "Commit the workload-capture-replay record", noargs),
    1098             :  pattern("wlc", "rollback", WLCcommitCmd, false, "Rollback the workload-capture-replay record", noargs),
    1099             :  pattern("wlc", "create_seq", WLCgeneric, false, "Catalog operation create_seq", args(0,3, arg("sname",str),arg("seqname",str),arg("action",int))),
    1100             :  pattern("wlc", "alter_seq", WLCgeneric, false, "Catalog operation alter_seq", args(0,3, arg("sname",str),arg("seqname",str),arg("val",lng))),
    1101             :  pattern("wlc", "alter_seq", WLCgeneric, false, "Catalog operation alter_seq", args(0,4, arg("sname",str),arg("seqname",str),arg("seq",ptr),batarg("val",lng))),
    1102             :  pattern("wlc", "drop_seq", WLCgeneric, false, "Catalog operation drop_seq", args(0,3, arg("sname",str),arg("nme",str),arg("action",int))),
    1103             :  pattern("wlc", "create_schema", WLCgeneric, false, "Catalog operation create_schema", args(0,4, arg("sname",str),arg("auth",str),arg("ifnotexists",int),arg("action",int))),
    1104             :  pattern("wlc", "drop_schema", WLCgeneric, false, "Catalog operation drop_schema", args(0,4, arg("sname",str),arg("s",str),arg("ifexists",int),arg("action",int))),
    1105             :  pattern("wlc", "create_table", WLCgeneric, false, "Catalog operation create_table", args(0,3, arg("sname",str),arg("tname",str),arg("temp",int))),
    1106             :  pattern("wlc", "create_view", WLCgeneric, false, "Catalog operation create_view", args(0,3, arg("sname",str),arg("tname",str),arg("temp",int))),
    1107             :  pattern("wlc", "drop_table", WLCgeneric, false, "Catalog operation drop_table", args(0,4, arg("sname",str),arg("name",str),arg("action",int),arg("ifexists",int))),
    1108             :  pattern("wlc", "drop_view", WLCgeneric, false, "Catalog operation drop_view", args(0,4, arg("sname",str),arg("name",str),arg("action",int),arg("ifexists",int))),
    1109             :  pattern("wlc", "drop_constraint", WLCgeneric, false, "Catalog operation drop_constraint", args(0,4, arg("sname",str),arg("name",str),arg("action",int),arg("ifexists",int))),
    1110             :  pattern("wlc", "alter_table", WLCgeneric, false, "Catalog operation alter_table", args(0,3, arg("sname",str),arg("tname",str),arg("action",int))),
    1111             :  pattern("wlc", "create_type", WLCgeneric, false, "Catalog operation create_type", args(0,3, arg("sname",str),arg("nme",str),arg("impl",str))),
    1112             :  pattern("wlc", "drop_type", WLCgeneric, false, "Catalog operation drop_type", args(0,3, arg("sname",str),arg("nme",str),arg("action",int))),
    1113             :  pattern("wlc", "grant_roles", WLCgeneric, false, "Catalog operation grant_roles", args(0,4, arg("sname",str),arg("auth",str),arg("grantor",int),arg("admin",int))),
    1114             :  pattern("wlc", "revoke_roles", WLCgeneric, false, "Catalog operation revoke_roles", args(0,4, arg("sname",str),arg("auth",str),arg("grantor",int),arg("admin",int))),
    1115             :  pattern("wlc", "grant", WLCgeneric, false, "Catalog operation grant", args(0,7, arg("sname",str),arg("tbl",str),arg("grantee",str),arg("privs",int),arg("cname",str),arg("gr",int),arg("grantor",int))),
    1116             :  pattern("wlc", "revoke", WLCgeneric, false, "Catalog operation revoke", args(0,7, arg("sname",str),arg("tbl",str),arg("grantee",str),arg("privs",int),arg("cname",str),arg("grant",int),arg("grantor",int))),
    1117             :  pattern("wlc", "grant_function", WLCgeneric, false, "Catalog operation grant_function", args(0,6, arg("sname",str),arg("fcnid",int),arg("grantee",str),arg("privs",int),arg("grant",int),arg("grantor",int))),
    1118             :  pattern("wlc", "revoke_function", WLCgeneric, false, "Catalog operation revoke_function", args(0,6, arg("sname",str),arg("fcnid",int),arg("grantee",str),arg("privs",int),arg("grant",int),arg("grantor",int))),
    1119             :  pattern("wlc", "create_user", WLCgeneric, false, "Catalog operation create_user", args(0,5, arg("sname",str),arg("passwrd",str),arg("enc",int),arg("schema",str),arg("fullname",str))),
    1120             :  pattern("wlc", "drop_user", WLCgeneric, false, "Catalog operation drop_user", args(0,2, arg("sname",str),arg("action",int))),
    1121             :  pattern("wlc", "drop_user", WLCgeneric, false, "Catalog operation drop_user", args(0,3, arg("sname",str),arg("auth",str),arg("action",int))),
    1122             :  pattern("wlc", "alter_user", WLCgeneric, false, "Catalog operation alter_user", args(0,5, arg("sname",str),arg("passwrd",str),arg("enc",int),arg("schema",str),arg("oldpasswrd",str))),
    1123             :  pattern("wlc", "rename_user", WLCgeneric, false, "Catalog operation rename_user", args(0,3, arg("sname",str),arg("newnme",str),arg("action",int))),
    1124             :  pattern("wlc", "create_role", WLCgeneric, false, "Catalog operation create_role", args(0,3, arg("sname",str),arg("role",str),arg("grator",int))),
    1125             :  pattern("wlc", "drop_role", WLCgeneric, false, "Catalog operation drop_role", args(0,3, arg("auth",str),arg("role",str),arg("action",int))),
    1126             :  pattern("wlc", "drop_role", WLCgeneric, false, "Catalog operation drop_role", args(0,2, arg("role",str),arg("action",int))),
    1127             :  pattern("wlc", "drop_index", WLCgeneric, false, "Catalog operation drop_index", args(0,3, arg("sname",str),arg("iname",str),arg("action",int))),
    1128             :  pattern("wlc", "drop_function", WLCgeneric, false, "Catalog operation drop_function", args(0,5, arg("sname",str),arg("fname",str),arg("fid",int),arg("type",int),arg("action",int))),
    1129             :  pattern("wlc", "create_function", WLCgeneric, false, "Catalog operation create_function", args(0,2, arg("sname",str),arg("fname",str))),
    1130             :  pattern("wlc", "create_trigger", WLCgeneric, false, "Catalog operation create_trigger", args(0,10, arg("sname",str),arg("tname",str),arg("triggername",str),arg("time",int),arg("orientation",int),arg("event",int),arg("old",str),arg("new",str),arg("cond",str),arg("qry",str))),
    1131             :  pattern("wlc", "drop_trigger", WLCgeneric, false, "Catalog operation drop_trigger", args(0,3, arg("sname",str),arg("nme",str),arg("ifexists",int))),
    1132             :  pattern("wlc", "alter_add_table", WLCgeneric, false, "Catalog operation alter_add_table", args(0,5, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("action",int))),
    1133             :  pattern("wlc", "alter_del_table", WLCgeneric, false, "Catalog operation alter_del_table", args(0,5, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("action",int))),
    1134             :  pattern("wlc", "alter_set_table", WLCgeneric, false, "Catalog operation alter_set_table", args(0,3, arg("sname",str),arg("tnme",str),arg("access",int))),
    1135             :  pattern("wlc", "alter_add_range_partition", WLCgeneric, false, "Catalog operation alter_add_range_partition", args(0,8, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("min",str),arg("max",str),arg("nills",bit),arg("update",int))),
    1136             :  pattern("wlc", "comment_on", WLCgeneric, false, "Catalog operation comment_on", args(0,2, arg("objid",int),arg("remark",str))),
    1137             :  pattern("wlc", "rename_schema", WLCgeneric, false, "Catalog operation rename_schema", args(0,2, arg("sname",str),arg("newnme",str))),
    1138             :  pattern("wlc", "rename_table", WLCgeneric, false, "Catalog operation rename_table", args(0,4, arg("osname",str),arg("nsname",str),arg("otname",str),arg("ntname",str))),
    1139             :  pattern("wlc", "rename_column", WLCgeneric, false, "Catalog operation rename_column", args(0,4, arg("sname",str),arg("tname",str),arg("cname",str),arg("newnme",str))),
    1140             :  pattern("wlc", "transaction_release", WLCgeneric, false, "A transaction statement (type can be commit,release,rollback or start)", args(1,3, arg("",void),arg("chain",int),arg("name",str))),
    1141             :  pattern("wlc", "transaction_commit", WLCgeneric, false, "A transaction statement (type can be commit,release,rollback or start)", args(1,3, arg("",void),arg("chain",int),arg("name",str))),
    1142             :  pattern("wlc", "transaction_rollback", WLCgeneric, false, "A transaction statement (type can be commit,release,rollback or start)", args(1,3, arg("",void),arg("chain",int),arg("name",str))),
    1143             :  pattern("wlc", "transaction_begin", WLCgeneric, false, "A transaction statement (type can be commit,release,rollback or start)", args(1,3, arg("",void),arg("chain",int),arg("name",str))),
    1144             :  pattern("wlc", "transaction", WLCgeneric, true, "Start an autocommit transaction", noargs),
    1145             :  pattern("wlc", "alter_add_value_partition", WLCgeneric, false, "Catalog operation alter_add_value_partition", args(0,6, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("nills",bit),arg("update",int))),
    1146             :  pattern("wlc", "alter_add_value_partition", WLCgeneric, false, "Catalog operation alter_add_value_partition", args(0,7, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("nills",bit),arg("update",int),vararg("arg",str))),
    1147             :  { .imp=NULL }
    1148             : };
    1149             : #include "mal_import.h"
    1150             : #ifdef _MSC_VER
    1151             : #undef read
    1152             : #pragma section(".CRT$XCU",read)
    1153             : #endif
    1154         255 : LIB_STARTUP_FUNC(init_wlc_mal)
    1155         255 : { mal_module("wlc", NULL, wlc_init_funcs); }

Generated by: LCOV version 1.14