LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - wlc.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 28 451 6.2 %
Date: 2021-10-13 02:24:04 Functions: 8 34 23.5 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * (c) 2017 Martin Kersten
      11             :  * This module collects the workload-capture-replay statements during transaction execution,
      12             :  * also known as asynchronous logical replication management. It can be used for
      13             :  * multiple purposes: BACKUP, REPLICATION, and REPLAY
      14             :  *
      15             :  * For a BACKUP we need either a complete update log from the beginning, or
      16             :  * a binary snapshot with a collection of logs recording its changes since.
      17             :  * To ensure transaction ACID properties, the log record should be stored on
      18             :  * disk within the transaction brackets, which may cause a serious IO load.
      19             :  * (Tip, store these logs files on an SSD or NVM)
      20             :  *
      21             :  * For REPLICATION, also called a database clone or slave, we take a snapshot and the
      22             :  * log files that reflect the recent changes. The log updates are replayed against
      23             :  * the snapshot until a specific time point or transaction id is reached.
      24             :  *
      25             :  * Some systems also use the logical logs to REPLAY all (expensive) queries
      26             :  * against the database. We skip this for the time being, as those queries
      27             :  * can be captured already in the server.
      28             :  * [A flag should be added to at least capture them]
      29             :  *
      30             :  * The goal of this module is to ease BACKUP and REPLICATION of a master database
      31             :  * with a time-bounded delay. This means that both master and replica run at a certain beat
      32             :  * (in seconds) by which information is made available or read by the replica.
      33             :  *
      34             :  * Such a replica is used in query workload sharing, database versioning, and (re-)partitioning.
      35             :  * Tables taken from the master are not protected against local updates in the replica.
      36             :  * However, any replay transaction that fails stops the cloning process.
      37             :  * Furthermore, only persistent tables are considered for replication.
      38             :  * Updates under the 'tmp' schema, i.e. temporary tables, are ignored.
      39             :  *
      40             :  * Simplicity and ease of end-user control has been the driving argument here.
      41             :  *
      42             :  * IMPLEMENTATION
      43             :  * The underlying assumption of the techniques deployed is that the database
      44             :  * resides on a proper (global/distributed) file system to guarantees recovery
      45             :  * from most storage system related failures, e.g. using RAID disks or LSF systems.
      46             :  *
      47             :  * A database can be set into 'master' mode only once using the SQL command:
      48             :  * CALL wrc_master.master() whose access permission is limited to the 'monetdb' user.[CHECK]
      49             :  * An optional path to the log record directory can be given to reduce the IO latency,
      50             :  * e.g. using a nearby SSD, or where there is ample of space to keep a long history,
      51             :  * e.g. a HDD or cold storage location.
      52             :  *
      53             :  * By default, the command creates a directory .../dbfarm/dbname/wlc_logs to hold all logs
      54             :  * and a configuration file .../dbfarm/dbname/wlc.config to hold the state of the transaction logs.
      55             :  * It contains the following key=value pairs:
      56             :  *              snapshot=<path to a snapshot directory>
      57             :  *              logs=<path to the wlc log directory>
      58             :  *              state=<started, stopped>
      59             :  *              batches=<next available batch file to be applied>
      60             :  *              beat=<maximal delay between log files, in seconds>
      61             :  *              write=<timestamp of the last transaction recorded>
      62             :  *
      63             :  * A missing path to the snapshot denotes that we can start the clone with an empty database.
      64             :  * The log files are stored as master/<dbname>_<batchnumber>. They belong to the snapshot.
      65             :  *
      66             :  * Each wlc log file contains a serial log of a number of committed compound transactions.
      67             :  * The log records are represented as ordinary MAL statement blocks, which
      68             :  * are executed in serial mode. (parallelism can be considered for large updates later)
      69             :  * Each transaction job is identified by a unique id, its starting time, and the original responsible user.
      70             :  * Each log-record should end with a commit to be allowed for re-execution.
      71             :  * Log records with a rollback tag are merely for analysis by the DBA, their statements are ignored.
      72             :  *
      73             :  * A transaction log file is created by the master using a heartbeat (in seconds).
      74             :  * A new transaction log file is published when the system has been collecting transaction records for some time.
      75             :  * The beat can be set using a SQL command, e.g.
      76             :  * CALL wcr_master.beat(duration)
      77             :  * Setting it to zero leads to a log file per transaction and may cause a large log directory
      78             :  * with thousands of small files.
      79             :  * The default of 5 minutes should balance polling overhead in most practical situations.
      80             :  * Intermittent flush() during this period ensures the committed log records survive
      81             :  * a crash.
      82             :  *
      83             :  * A minor problem here is that we should ensure that the log file is closed even if there
      84             :  * are no transactions running. It is solved with a separate monitor thread, which ensures
      85             :  * that the a new log file is created at least after 'beat' seconds since the first logrecord was created.
      86             :  * After closing, the replicas can see from the master configuration file that a new log batch is available.
      87             :  *
      88             :  * The final step is to close stop transaction logging with the command
      89             :  * CALL wcr_master.stop().
      90             :  * It typically is the end-of-life-time for a snapshot. For example, when planning to do
      91             :  * a large bulk load of the database, stopping logging avoids a double write into the
      92             :  * database. The database can only be brought back into master mode using a fresh snapshot.
      93             :  *
      94             :  * [It is not advicable to temporarily stop logging and continue afterwards, because then there
      95             :  * is no guarantee the user will see a consistent database.]
      96             :  *
      97             :  * One of the key challenges for a DBA is to keep the log directory manageable, because it grows
      98             :  * with the speed up updates being applied to the database. This calls for regularly checking
      99             :  * for their disk footprint and taking a new snapshot as a frame of reference.
     100             :  *
     101             :  * [TODO A trigger should be added to stop logging and call for a fresh snapshot first]
     102             :  * [TODO the batch files might include the snapshot id for ease of rebuild]
     103             :  *
     104             :  * The DBA tool 'monetdb' provides options to create a master and its replicas.
     105             :  * It will also maintain the list of replicas for inspection and managing their drift.
     106             :  * For example,
     107             :  *       monetdb master <dbname> [ <optional snapshot path>]
     108             :  * which locks the database, takes a save copy, initializes the state chance to master.
     109             :  *
     110             :  * A fresh replica can be constructed as follows:
     111             :  *      monetdb replicate <dbname> <mastername>
     112             :  *
     113             :  * Instead of using the monetdb command line we can use the SQL calls directly
     114             :  * sys.master() and sys.replicate(), provided we start with a fresh database.
     115             :  *
     116             :  * CLONE
     117             :  *
     118             :  * Every clone should start off with a copy of the binary snapshot identified by 'snapshot'.
     119             :  * A fresh database can be turned into a clone using the call
     120             :  *     CALL wcr_replica.master('mastername')
     121             :  * It will grab the latest snapshot of the master and applies all
     122             :  * available log files before releasing the system.
     123             :  * The master has no knowledge about the number of clones and their whereabouts.
     124             :  *
     125             :  * The clone process will iterate in the background through the log files,
     126             :  * applying all update transactions.
     127             :  *
     128             :  * An optional timestamp or transaction id can be added to the replicate() command to
     129             :  * apply the logs until a given moment. This is particularly handy when an unexpected
     130             :  * desastrous user action (drop persistent table) has to be recovered from.
     131             :  *
     132             :  * CALL wcr_replica.master('mastername');  -- get logs from a specific master
     133             :  * ...
     134             :  * CALL wcr_replicate.replicate(tag); -- stops after we are in sink with tag
     135             :  * ...
     136             :  * CALL wcr_replicate.replicate(NOW()); -- stop after we sinked all transactions
     137             :  * ...
     138             :  * CALL wcr_replicate.replicate(); -- synchronize in background continuously
     139             :  * ...
     140             :  * CALL wcr_replicate.stop(); -- stop the synchroniation thread
     141             :  *
     142             :  * SELECT wcr_replica.clock();
     143             :  * returns the timestamp of the last replicated transaction.
     144             :  * SELECT wcr_replica.tick();
     145             :  * returns the transaction id of the last replicated transaction.
     146             :  * SELECT wcr_master.clock();
     147             :  * return the timestamp of the last committed transaction in the master.
     148             :  * SELECT wcr_master.tick();
     149             :  * return the transaction id of the last committed transaction in the master.
     150             :  *
     151             :  * Any failure encountered during a log replay terminates the replication process,
     152             :  * leaving a message in the merovingian log configuration.
     153             :  *
     154             :  * The wlc files purposely have a textual format derived from the MAL statements.
     155             :  * This provides a stepping stone for remote execution later.
     156             :  *
     157             :  * [TODO consider the roll logging of SQL session variables, i.e. optimizer_pipe
     158             :  * as part of the log record]
     159             :  * For updates we don't need special care for this.
     160             :  */
     161             : #include "monetdb_config.h"
     162             : #include <time.h>
     163             : #include "mal_builder.h"
     164             : #include "wlc.h"
     165             : #include "gdk_time.h"
     166             : #include "mutils.h"
     167             : #include "mal_function.h"
     168             : 
     169             : static MT_Lock     wlc_lock = MT_LOCK_INITIALIZER(wlc_lock);
     170             : 
     171             : static char wlc_snapshot[FILENAME_MAX]; // The location of the snapshot against which the logs work
     172             : static stream *wlc_fd = 0;
     173             : 
     174             : // These properties are needed by the replica to direct the roll-forward.
     175             : char wlc_dir[FILENAME_MAX];     // The location in the global file store for the logs
     176             : char wlc_name[IDLENGTH];        // The master database name
     177             : lng  wlc_tag = 0;                       // next transaction id
     178             : int  wlc_state = 0;                     // The current status of the logger in the life cycle
     179             : static char wlc_write[26];                      // The timestamp of the last committed transaction
     180             : int  wlc_batches = 0;           // identifier of next batch
     181             : int  wlc_beat = 10;             // maximal period covered by a single log file in seconds
     182             : 
     183             : /* The database snapshots are binary copies of the dbfarm/database/bat
     184             :  * New snapshots are created currently using the 'monetdb snapshot <db>' command
     185             :  * or a SQL procedure.
     186             :  *
     187             :  * The wlc logs are stored in the snapshot directory as a time-stamped list
     188             :  */
     189             : 
     190             : int
     191      353093 : WLCused(void)
     192             : {
     193      353093 :         return wlc_dir[0] != 0;
     194             : }
     195             : 
     196             : /* The master configuration file is a simple key=value table */
     197             : str
     198           0 : WLCreadConfig(FILE *fd)
     199             : {
     200             :         str msg = MAL_SUCCEED;
     201             :         char path[FILENAME_MAX];
     202             :         int len;
     203             : 
     204           0 :         while( fgets(path, FILENAME_MAX, fd) ){
     205           0 :                 path[strlen(path)-1] = 0;
     206           0 :                 if( strncmp("logs=", path,5) == 0) {
     207           0 :                         len = snprintf(wlc_dir, FILENAME_MAX, "%s", path + 5);
     208           0 :                         if (len == -1 || len >= FILENAME_MAX) {
     209           0 :                                 msg = createException(MAL, "wlc.readConfig", "logs config value is too large");
     210           0 :                                 goto bailout;
     211             :                         }
     212             :                 }
     213           0 :                 if( strncmp("snapshot=", path,9) == 0) {
     214           0 :                         len = snprintf(wlc_snapshot, FILENAME_MAX, "%s", path + 9);
     215           0 :                         if (len == -1 || len >= FILENAME_MAX) {
     216           0 :                                 msg = createException(MAL, "wlc.readConfig", "snapshot config value is too large");
     217           0 :                                 goto bailout;
     218             :                         }
     219             :                 }
     220           0 :                 if( strncmp("tag=", path,4) == 0)
     221           0 :                         wlc_tag = atol(path+ 4);
     222           0 :                 if( strncmp("write=", path,6) == 0) {
     223           0 :                         len = snprintf(wlc_write, 26, "%s", path + 6);
     224           0 :                         if (len == -1 || len >= 26) {
     225           0 :                                 msg = createException(MAL, "wlc.readConfig", "write config value is too large");
     226           0 :                                 goto bailout;
     227             :                         }
     228             :                 }
     229           0 :                 if( strncmp("batches=", path, 8) == 0)
     230           0 :                         wlc_batches = atoi(path+ 8);
     231           0 :                 if( strncmp("beat=", path, 5) == 0)
     232           0 :                         wlc_beat = atoi(path+ 5);
     233           0 :                 if( strncmp("state=", path, 6) == 0)
     234           0 :                         wlc_state = atoi(path+ 6);
     235             :         }
     236           0 : bailout:
     237           0 :         fclose(fd);
     238           0 :         return msg;
     239             : }
     240             : 
     241             : static str
     242           0 : WLCgetConfig(void){
     243             :         str l;
     244             :         FILE *fd;
     245             : 
     246           0 :         if((l = GDKfilepath(0,0,"wlc.config",0)) == NULL)
     247           0 :                 throw(MAL,"wlc.getConfig","Could not access wlc.config file\n");
     248             :         fd = MT_fopen(l,"r");
     249           0 :         GDKfree(l);
     250           0 :         if( fd == NULL)
     251           0 :                 throw(MAL,"wlc.getConfig","Could not access wlc.config file\n");
     252           0 :         return WLCreadConfig(fd);
     253             : }
     254             : 
     255             : static
     256           0 : str WLCsetConfig(void){
     257             :         str path;
     258             :         stream *fd;
     259             : 
     260             :         /* be aware to be safe, on a failing fopen */
     261           0 :         if((path = GDKfilepath(0,0,"wlc.config",0)) == NULL)
     262           0 :                 throw(MAL,"wlc.setConfig","Could not access wlc.config\n");
     263           0 :         fd = open_wastream(path);
     264           0 :         GDKfree(path);
     265           0 :         if( fd == NULL)
     266           0 :                 throw(MAL,"wlc.setConfig","Could not access wlc.config: %s\n", mnstr_peek_error(NULL));
     267           0 :         if( wlc_snapshot[0] )
     268           0 :                 mnstr_printf(fd,"snapshot=%s\n", wlc_snapshot);
     269           0 :         mnstr_printf(fd,"logs=%s\n", wlc_dir);
     270           0 :         mnstr_printf(fd,"tag="LLFMT"\n", wlc_tag );
     271           0 :         mnstr_printf(fd,"write=%s\n", wlc_write );
     272           0 :         mnstr_printf(fd,"state=%d\n", wlc_state );
     273           0 :         mnstr_printf(fd,"batches=%d\n", wlc_batches );
     274           0 :         mnstr_printf(fd,"beat=%d\n", wlc_beat );
     275           0 :         (void) mnstr_flush(wlc_fd, MNSTR_FLUSH_DATA);
     276           0 :         (void) mnstr_fsync(wlc_fd);
     277           0 :         close_stream(fd);
     278           0 :         return MAL_SUCCEED;
     279             : }
     280             : 
     281             : // creation of the logger file and updating the configuration file should be atomic !!!
     282             : // The log files are marked with the database name. This allows for easy recognition later on.
     283             : static str
     284           0 : WLCsetlogger(void)
     285             : {
     286             :         int len;
     287             :         char path[FILENAME_MAX];
     288             :         str msg = MAL_SUCCEED;
     289             : 
     290           0 :         if( wlc_dir[0] == 0)
     291           0 :                 throw(MAL,"wlc.setlogger","Path not initalized");
     292           0 :         MT_lock_set(&wlc_lock);
     293           0 :         len = snprintf(path,FILENAME_MAX,"%s%c%s_%012d", wlc_dir, DIR_SEP, wlc_name, wlc_batches);
     294           0 :         if (len == -1 || len >= FILENAME_MAX) {
     295           0 :                 MT_lock_unset(&wlc_lock);
     296           0 :                 throw(MAL, "wlc.setlogger", "Logger filename path is too large");
     297             :         }
     298           0 :         wlc_fd = open_wastream(path);
     299           0 :         if( wlc_fd == 0){
     300           0 :                 MT_lock_unset(&wlc_lock);
     301           0 :                 throw(MAL,"wlc.logger","Could not create %s\n",path);
     302             :         }
     303             : 
     304           0 :         wlc_batches++;
     305           0 :         msg = WLCsetConfig();
     306           0 :         MT_lock_unset(&wlc_lock);
     307           0 :         return msg;
     308             : }
     309             : 
     310             : /* force the current log file to its storage container */
     311             : static str
     312         264 : WLCcloselogger(void)
     313             : {
     314         264 :         if( wlc_fd == NULL)
     315             :                 return MAL_SUCCEED;
     316           0 :         mnstr_flush(wlc_fd, MNSTR_FLUSH_DATA);
     317           0 :         mnstr_fsync(wlc_fd);
     318           0 :         close_stream(wlc_fd);
     319           0 :         wlc_fd= NULL;
     320           0 :         return WLCsetConfig();
     321             : }
     322             : 
     323             : /* force the current log file to its storage container, but dont create a new one yet */
     324             : static str
     325           0 : WLCflush(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     326             : {
     327             :         (void) cntxt;
     328             :         (void) mb;
     329             :         (void) stk;
     330             :         (void) pci;
     331           0 :         if( wlc_fd == NULL)
     332             :                 return MAL_SUCCEED;
     333           0 :         mnstr_flush(wlc_fd, MNSTR_FLUSH_DATA);
     334           0 :         mnstr_fsync(wlc_fd);
     335           0 :         return WLCsetConfig();
     336             : }
     337             : 
     338             : static str
     339         264 : WLCepilogue(void *ret)
     340             : {
     341             :         str msg = MAL_SUCCEED;
     342             : 
     343             :         (void)ret;
     344             : 
     345         264 :         MT_lock_set(&wlc_lock);
     346         264 :         msg = WLCcloselogger();
     347         264 :         wlc_snapshot[0]=0;
     348         264 :         wlc_dir[0]= 0;
     349         264 :         wlc_name[0]= 0;
     350         264 :         wlc_write[0] =0;
     351         264 :         MT_lock_unset(&wlc_lock);
     352             :         //TODO we have to return a possible error message somehow
     353         264 :         return(msg);
     354             : }
     355             : 
     356             : /*
     357             :  * The WLClogger process ensures that log files are properly closed
     358             :  * and released when their cycle time window has expired.
     359             :  */
     360             : 
     361             : static MT_Id wlc_logger;
     362             : 
     363             : static void
     364           0 : WLClogger(void *arg)
     365             : {
     366             :         int seconds;
     367             :         str msg = MAL_SUCCEED;
     368             : 
     369             :         (void) arg;
     370           0 :         while(!GDKexiting()){
     371           0 :                 if( wlc_dir[0] && wlc_fd ){
     372           0 :                         MT_lock_set(&wlc_lock);
     373           0 :                         if((msg = WLCcloselogger()) != MAL_SUCCEED) {
     374           0 :                                 TRC_ERROR(MAL_WLC, "%s\n", msg);
     375           0 :                                 freeException(msg);
     376             :                         }
     377           0 :                         MT_lock_unset(&wlc_lock);
     378             :                 }
     379           0 :                 for( seconds = 0; (wlc_beat == 0 || seconds < wlc_beat) && ! GDKexiting(); seconds++)
     380           0 :                         MT_sleep_ms( 1000);
     381             :         }
     382           0 : }
     383             : /*
     384             :  * The existence of the master directory should be checked upon server restart.
     385             :  * Then the master record information should be set and the WLClogger started.
     386             :  */
     387             : 
     388             : #ifndef F_OK
     389             : #define F_OK 0
     390             : #endif
     391             : 
     392             : str
     393           0 : WLCinit(void)
     394             : {
     395             :         str conf, msg;
     396             :         int len;
     397             : 
     398           0 :         if( wlc_state == WLC_STARTUP){
     399             :                 // use default location for master configuration file
     400           0 :                 if((conf = GDKfilepath(0,0,"wlc.config",0)) == NULL)
     401           0 :                         throw(MAL,"wlc.init","Could not access wlc.config\n");
     402             : 
     403           0 :                 if (MT_access(conf, F_OK) ){
     404           0 :                         GDKfree(conf);
     405           0 :                         return MAL_SUCCEED;
     406             :                 }
     407           0 :                 GDKfree(conf);
     408             :                 // we are in master mode
     409           0 :                 len = snprintf(wlc_name, IDLENGTH, "%s", GDKgetenv("gdk_dbname"));
     410           0 :                 if (len == -1 || len >= IDLENGTH)
     411           0 :                         throw(MAL, "wlc.init", "gdk_dbname variable is too large");
     412             : 
     413           0 :                 if ((msg = WLCgetConfig()) != MAL_SUCCEED)
     414             :                         return msg;
     415           0 :                 if (MT_create_thread(&wlc_logger, WLClogger , (void*) 0,
     416             :                                                          MT_THR_DETACHED, "WLClogger") < 0) {
     417           0 :                         TRC_ERROR(MAL_WLC, "Thread could not be spawned\n");
     418             :                 }
     419             :         }
     420             :         return MAL_SUCCEED;
     421             : }
     422             : 
     423             : static str
     424           0 : WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     425             : {
     426             :         (void) cntxt;
     427             :         (void) mb;
     428             :         (void) stk;
     429             :         (void) pci;
     430           0 :         return WLCinit();
     431             : }
     432             : 
     433             : static str
     434           2 : WLCgetclock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     435           2 : {       str *ret = getArgReference_str(stk,pci,0);
     436             :         (void) cntxt;
     437             :         (void) mb;
     438           2 :         if( wlc_write[0])
     439           0 :                 *ret = GDKstrdup(wlc_write);
     440             :         else
     441           2 :                 *ret = GDKstrdup(str_nil);
     442           2 :         if(*ret == NULL)
     443           0 :                 throw(MAL,"wlc.getclock", MAL_MALLOC_FAIL);
     444             :         return MAL_SUCCEED;
     445             : }
     446             : 
     447             : static str
     448           2 : WLCgettick(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     449           2 : {       lng *ret = getArgReference_lng(stk,pci,0);
     450             :         (void) cntxt;
     451             :         (void) mb;
     452           2 :         *ret = wlc_tag;
     453           2 :         return MAL_SUCCEED;
     454             : }
     455             : 
     456             : /* Changing the beat should have immediate effect
     457             :  * It forces a new log file
     458             :  */
     459             : static str
     460           0 : WLCsetbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     461             : {       int beat;
     462             :         (void) mb;
     463             :         (void) cntxt;
     464           0 :         beat = * getArgReference_int(stk,pci,1);
     465           0 :         if ( beat < 0)
     466           0 :                 throw(MAL, "wlc.setbeat", "beat should be a positive number");
     467           0 :         wlc_beat = beat;
     468           0 :         return WLCcloselogger();
     469             : }
     470             : 
     471             : static str
     472           0 : WLCgetbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     473           0 : {       int *ret = getArgReference_int(stk,pci,0);
     474             :         (void) mb;
     475             :         (void) cntxt;
     476           0 :         *ret = wlc_beat;
     477           0 :         return MAL_SUCCEED;
     478             : }
     479             : 
     480             : static str
     481           0 : WLCmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     482             : {
     483             :         int len;
     484             :         char path[FILENAME_MAX];
     485             :         str l;
     486             : 
     487             :         (void) cntxt;
     488             :         (void) mb;
     489           0 :         if( wlc_state == WLC_STOP)
     490           0 :                 throw(MAL,"master","WARNING: logging has been stopped. Use new snapshot");
     491           0 :         if( wlc_state == WLC_RUN)
     492           0 :                 throw(MAL,"master","WARNING: already in master mode, call ignored");
     493           0 :         if( pci->argc == 2) {
     494           0 :                 len = snprintf(path, FILENAME_MAX, "%s", *getArgReference_str(stk, pci,1));
     495           0 :                 if (len == -1 || len >= FILENAME_MAX)
     496           0 :                         throw(MAL, "wlc.master", "wlc master filename path is too large");
     497             :         } else {
     498           0 :                 if((l = GDKfilepath(0,0,"wlc_logs",0)) == NULL)
     499           0 :                         throw(SQL,"wlc.master", MAL_MALLOC_FAIL);
     500           0 :                 len = snprintf(path,FILENAME_MAX,"%s%c",l, DIR_SEP);
     501           0 :                 GDKfree(l);
     502           0 :                 if (len == -1 || len >= FILENAME_MAX)
     503           0 :                         throw(MAL, "wlc.master", "wlc master filename path is too large");
     504             :         }
     505             :         // set location for logs
     506           0 :         if( GDKcreatedir(path) != GDK_SUCCEED)
     507           0 :                 throw(SQL,"wlc.master","Could not create %s\n", path);
     508           0 :         len = snprintf(wlc_name, IDLENGTH, "%s", GDKgetenv("gdk_dbname"));
     509           0 :         if (len == -1 || len >= IDLENGTH)
     510           0 :                 throw(SQL,"wlc.master","gdk_dbname is too large");
     511           0 :         len = snprintf(wlc_dir, FILENAME_MAX, "%s", path);
     512           0 :         if (len == -1 || len >= FILENAME_MAX)
     513           0 :                 throw(SQL,"wlc.master","wlc_dir directory name is too large");
     514           0 :         wlc_state= WLC_RUN;
     515           0 :         return WLCsetConfig();
     516             : }
     517             : 
     518             : static str
     519           0 : WLCstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     520             : {
     521             :         (void) cntxt;
     522             :         (void) mb;
     523             :         (void) stk;
     524             :         (void) pci;
     525           0 :         if( wlc_state != WLC_RUN )
     526           0 :                 throw(MAL,"wlc.stop","WARNING: master role not active");
     527           0 :         wlc_state = WLC_STOP;
     528           0 :         return WLCsetConfig();
     529             : }
     530             : 
     531             : static str
     532           0 : WLCsettime(Client cntxt, InstrPtr pci, InstrPtr p, str fcn)
     533             : {
     534           0 :         timestamp ts = timestamp_current();
     535           0 :         str wlc_time = NULL;
     536           0 :         size_t wlc_limit = 0;
     537             :         InstrPtr ins;
     538             : 
     539             :         (void) pci;
     540           0 :         assert(!is_timestamp_nil(ts));
     541           0 :         if (timestamp_tostr(&wlc_time, &wlc_limit, &ts, true) < 0)
     542           0 :                 throw(MAL, fcn, "Unable to retrieve current time");
     543           0 :         ins = pushStr(cntxt->wlc, p, wlc_time);
     544           0 :         GDKfree(wlc_time);
     545           0 :         if (ins == NULL)
     546           0 :                 throw(MAL, fcn, MAL_MALLOC_FAIL);
     547             :         return MAL_SUCCEED;
     548             : }
     549             : 
     550             : /* Beware that a client context can be used in parallel and
     551             :  * that we don't want transaction interference caused by merging
     552             :  * the MAL instructions accidentally.
     553             :  * The effectively means that the SQL transaction record should
     554             :  * collect the MAL instructions and flush them.
     555             :  */
     556             : static str
     557           0 : WLCpreparewrite(Client cntxt)
     558             : {       str msg = MAL_SUCCEED;
     559             :         // save the wlc record on a file
     560             : 
     561           0 :         if( cntxt->wlc == 0 || cntxt->wlc->stop <= 1 ||  cntxt->wlc_kind == WLC_QUERY )
     562             :                 return MAL_SUCCEED;
     563             : 
     564           0 :         if( wlc_state != WLC_RUN){
     565           0 :                 trimMalVariables(cntxt->wlc, NULL);
     566           0 :                 resetMalTypes(cntxt->wlc, 0);
     567           0 :                 cntxt->wlc_kind = WLC_QUERY;
     568           0 :                 return MAL_SUCCEED;
     569             :         }
     570           0 :         if( wlc_dir[0] ){
     571           0 :                 if (wlc_fd == NULL){
     572           0 :                         msg = WLCsetlogger();
     573           0 :                         if( msg) {
     574             :                                 return msg;
     575             :                         }
     576             :                 }
     577             : 
     578           0 :                 MT_lock_set(&wlc_lock);
     579           0 :                 printFunction(wlc_fd, cntxt->wlc, 0, LIST_MAL_CALL );
     580           0 :                 (void) mnstr_flush(wlc_fd, MNSTR_FLUSH_DATA);
     581           0 :                 (void) mnstr_fsync(wlc_fd);
     582             :                 // close file if no delay is allowed
     583           0 :                 if( wlc_beat == 0 )
     584           0 :                         msg = WLCcloselogger();
     585             : 
     586           0 :                 MT_lock_unset(&wlc_lock);
     587           0 :                 trimMalVariables(cntxt->wlc, NULL);
     588           0 :                 resetMalTypes(cntxt->wlc, 0);
     589           0 :                 cntxt->wlc_kind = WLC_QUERY;
     590             :         } else
     591           0 :                 throw(MAL,"wlc.write","WLC log path missing ");
     592             : 
     593           0 :         if( wlc_state == WLC_STOP)
     594           0 :                 throw(MAL,"wlc.write","Logging for this snapshot has been stopped. Use a new snapshot to continue logging.");
     595             :         return msg;
     596             : }
     597             : 
     598             : static str
     599           0 : WLCstart(Client cntxt, str fcn)
     600             : {
     601             :         InstrPtr pci;
     602             :         str msg = MAL_SUCCEED;
     603           0 :         MalBlkPtr mb = cntxt->wlc;
     604             :         lng tag;
     605             : 
     606           0 :         if( cntxt->wlc == NULL){
     607           0 :                 if((cntxt->wlc = newMalBlk(STMT_INCREMENT)) == NULL)
     608           0 :                         throw(MAL, fcn, MAL_MALLOC_FAIL);
     609             :                 mb = cntxt->wlc;
     610             :         }
     611             :         /* Find a single transaction sequence ending with COMMIT or ROLLBACK */
     612           0 :         if( mb->stop > 1 ){
     613           0 :                 pci = getInstrPtr(mb, mb->stop -1 );
     614           0 :                 if (!(strcmp( getFunctionId(pci), "commit") == 0 || strcmp( getFunctionId(pci), "rollback") == 0))
     615             :                         return MAL_SUCCEED;
     616             :         }
     617             : 
     618             :         /* create the start of a new transaction block */
     619           0 :         MT_lock_set(&wlc_lock);
     620           0 :         tag = wlc_tag;
     621           0 :         wlc_tag++; // Update wlc administration
     622             : 
     623           0 :         pci = newStmt(mb,"wlr", "transaction");
     624           0 :         pci = pushLng(mb, pci, tag);
     625           0 :         if((msg = WLCsettime(cntxt,pci, pci, fcn)) == MAL_SUCCEED) {
     626           0 :                 snprintf(wlc_write, 26, "%s", getVarConstant(cntxt->wlc, getArg(pci, 2)).val.sval);
     627           0 :                 pci = pushStr(mb, pci, cntxt->username);
     628           0 :                 pci->ticks = GDKms();
     629             :         }
     630           0 :         MT_lock_unset(&wlc_lock);
     631             : 
     632           0 :         return msg;
     633             : }
     634             : 
     635             : static str
     636           0 : WLCquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     637             : {
     638             :         InstrPtr p;
     639             :         str msg = MAL_SUCCEED;
     640             : 
     641             :         (void) stk;
     642           0 :         if ( strcmp("-- no query",getVarConstant(mb, getArg(pci,1)).val.sval) == 0)
     643             :                 return MAL_SUCCEED;     // ignore system internal queries.
     644           0 :         msg = WLCstart(cntxt, "wlr.query");
     645           0 :         if(msg)
     646             :                 return msg;
     647           0 :         cntxt->wlc_kind = WLC_QUERY;
     648           0 :         p = newStmt(cntxt->wlc, "wlr","query");
     649           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     650           0 :         return msg;
     651             : }
     652             : 
     653             : static str
     654           0 : WLCcatalog(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     655             : {
     656             :         InstrPtr p;
     657             :         str msg = MAL_SUCCEED;
     658             : 
     659             :         (void) stk;
     660           0 :         msg = WLCstart(cntxt, "wlr.catalog");
     661           0 :         if(msg)
     662             :                 return msg;
     663           0 :         cntxt->wlc_kind = WLC_CATALOG;
     664           0 :         p = newStmt(cntxt->wlc, "wlr","catalog");
     665           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     666           0 :         return msg;
     667             : }
     668             : 
     669             : static str
     670           0 : WLCaction(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     671             : {
     672             :         InstrPtr p;
     673             :         str msg = MAL_SUCCEED;
     674             : 
     675             :         (void) stk;
     676           0 :         msg = WLCstart(cntxt, "wlr.action");
     677           0 :         if(msg)
     678             :                 return msg;
     679           0 :         cntxt->wlc_kind = WLC_UPDATE;
     680           0 :         p = newStmt(cntxt->wlc, "wlr","action");
     681           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     682           0 :         return msg;
     683             : }
     684             : 
     685             : /*
     686             :  * We actually don't need the catalog operations in the log.
     687             :  * It is sufficient to upgrade the replay block to WLR_CATALOG.
     688             :  */
     689             : static str
     690           0 : WLCgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     691             : {
     692             :         InstrPtr p;
     693             :         int i, k,  tpe, varid;
     694             :         str msg = MAL_SUCCEED;
     695             : 
     696             :         (void) stk;
     697           0 :         msg = WLCstart(cntxt, "wlr.generic");
     698           0 :         if(msg)
     699             :                 return msg;
     700           0 :         cntxt->wlc_kind = WLC_IGNORE;
     701           0 :         p = newInstruction(cntxt->wlc, "wlr",getFunctionId(pci));
     702           0 :         k = newTmpVariable(mb,TYPE_any);
     703           0 :         if( k >= 0)
     704           0 :                 getArg(p,0) =  k;
     705           0 :         for( i = pci->retc; i< pci->argc; i++){
     706           0 :                 tpe =getArgType(mb, pci, i);
     707           0 :                 switch(tpe){
     708           0 :                 case TYPE_str:
     709           0 :                         k = defConstant(mb,TYPE_str,&getVarConstant(mb, getArg(pci, i)));
     710           0 :                         if( k >= 0)
     711           0 :                                 p = addArgument(cntxt->wlc, p, k);
     712             :                         break;
     713           0 :                 default:
     714           0 :                         varid = defConstant(cntxt->wlc, tpe, getArgReference(stk, pci, i));
     715           0 :                         if( varid >= 0)
     716           0 :                                 p = addArgument(cntxt->wlc, p, varid);
     717             :                 }
     718             :         }
     719           0 :         p->ticks = GDKms();
     720           0 :         pushInstruction(mb,p);
     721           0 :         cntxt->wlc_kind = WLC_CATALOG;
     722           0 :         return  msg;
     723             : }
     724             : 
     725             : #define bulk(TPE1, TPE2)\
     726             : {       TPE1 *p = (TPE1 *) bi.base;\
     727             :         TPE1 *q = (TPE1 *) bi.base + BUNlast(b);\
     728             :         int k=0; \
     729             :         for( ; p < q; p++, k++){\
     730             :                 if( k % 32 == 31){\
     731             :                         pci = newStmt(cntxt->wlc, "wlr",getFunctionId(pci));\
     732             :                         pci = pushStr(cntxt->wlc, pci, sch);\
     733             :                         pci = pushStr(cntxt->wlc, pci, tbl);\
     734             :                         pci = pushStr(cntxt->wlc, pci, col);\
     735             :                         pci->ticks = GDKms();\
     736             :                 }\
     737             :                 pci = push##TPE2(cntxt->wlc, pci ,*p);\
     738             : } }
     739             : 
     740             : #define updateBatch(TPE1,TPE2)\
     741             : {       TPE1 *x = (TPE1 *) bvali.base;\
     742             :         TPE1 *y = (TPE1 *) bvali.base + BUNlast(b);\
     743             :         int k=0; \
     744             :         for( ; x < y; x++, k++){\
     745             :                 p = newStmt(cntxt->wlc, "wlr","update");\
     746             :                 p = pushStr(cntxt->wlc, p, sch);\
     747             :                 p = pushStr(cntxt->wlc, p, tbl);\
     748             :                 p = pushStr(cntxt->wlc, p, col);\
     749             :                 p = pushOid(cntxt->wlc, p,  (ol? *ol++: o++));\
     750             :                 p = push##TPE2(cntxt->wlc, p ,*x);\
     751             : } }
     752             : 
     753             : static str
     754           0 : WLCdatashipping(Client cntxt, MalBlkPtr mb, InstrPtr pci, int bid)
     755             : {       BAT *b;
     756             :         BATiter bi;
     757             :         str sch, tbl, col;
     758             :         str msg = MAL_SUCCEED;
     759             :         (void) mb;
     760             : 
     761           0 :         b = BATdescriptor(bid);
     762           0 :         if (b == NULL) {
     763           0 :                 throw(MAL, "wlc.datashipping", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     764             :         }
     765             : 
     766             : // large BATs can also be re-created using the query.
     767             : // Copy into should always be expanded, because the source may not
     768             : // be accessible in the replica. TODO
     769             : 
     770           0 :         sch = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,1)).val.sval);
     771           0 :         tbl = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,2)).val.sval);
     772           0 :         col = GDKstrdup(getVarConstant(cntxt->wlc, getArg(pci,3)).val.sval);
     773           0 :         if(!sch || !tbl || !col) {
     774           0 :                 msg = createException(MAL, "wlc.datashipping", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     775           0 :                 goto finish;
     776             :         }
     777           0 :         if (cntxt->wlc_kind < WLC_UPDATE)
     778           0 :                 cntxt->wlc_kind = WLC_UPDATE;
     779           0 :         bi = bat_iterator(b);
     780           0 :         switch( ATOMstorage(b->ttype)){
     781           0 :         case TYPE_bit: bulk(bit,Bit); break;
     782           0 :         case TYPE_bte: bulk(bte,Bte); break;
     783           0 :         case TYPE_sht: bulk(sht,Sht); break;
     784           0 :         case TYPE_int: bulk(int,Int); break;
     785           0 :         case TYPE_lng: bulk(lng,Lng); break;
     786           0 :         case TYPE_flt: bulk(flt,Flt); break;
     787           0 :         case TYPE_dbl: bulk(dbl,Dbl); break;
     788             : #ifdef HAVE_HGE
     789           0 :         case TYPE_hge: bulk(hge,Hge); break;
     790             : #endif
     791           0 :         case TYPE_str:
     792             :                 {       BUN p,q;
     793             :                         int k=0;
     794           0 :                         BATloop(b,p,q){
     795           0 :                                 if( k % 32 == 31){
     796           0 :                                         pci = newStmt(cntxt->wlc, "wlr",getFunctionId(pci));
     797           0 :                                         pci = pushStr(cntxt->wlc, pci, sch);
     798           0 :                                         pci = pushStr(cntxt->wlc, pci, tbl);
     799           0 :                                         pci = pushStr(cntxt->wlc, pci, col);
     800             :                                 }
     801           0 :                                 k++;
     802           0 :                                 pci = pushStr(cntxt->wlc, pci ,(str) BUNtvar(bi,p));
     803             :                         }
     804             :                 }
     805             :                 break;
     806           0 :         default:
     807           0 :                 TRC_ERROR(MAL_WLC, "Non-supported type: %d\n", ATOMstorage(b->ttype));
     808           0 :                 cntxt->wlc_kind = WLC_CATALOG;
     809             :         }
     810           0 :         bat_iterator_end(&bi);
     811           0 : finish:
     812           0 :         BBPunfix(b->batCacheid);
     813           0 :         if (sch)
     814           0 :                 GDKfree(sch);
     815           0 :         if (tbl)
     816           0 :                 GDKfree(tbl);
     817           0 :         if (col)
     818           0 :                 GDKfree(col);
     819             :         return msg;
     820             : }
     821             : 
     822             : static str
     823           0 : WLCappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     824             : {
     825             :         InstrPtr p;
     826             :         int tpe, varid;
     827             :         str msg = MAL_SUCCEED;
     828             : 
     829             :         (void) stk;
     830             :         (void) mb;
     831           0 :         msg = WLCstart(cntxt, "wlr.append");
     832           0 :         if(msg)
     833             :                 return msg;
     834           0 :         p = newStmt(cntxt->wlc, "wlr","append");
     835           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     836           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
     837           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,3)).val.sval);
     838             : 
     839             :         // extend the instructions with all values.
     840             :         // If this become too large we can always switch to a "catalog" mode
     841             :         // forcing re-execution instead
     842           0 :         tpe= getArgType(mb,pci,4);
     843           0 :         if (isaBatType(tpe) ){
     844             :                 // actually check the size of the BAT first, most have few elements
     845           0 :                 msg = WLCdatashipping(cntxt, mb, p, stk->stk[getArg(pci,4)].val.bval);
     846             :         } else {
     847             :                 ValRecord cst;
     848           0 :                 if (VALcopy(&cst, getArgReference(stk,pci,4)) != NULL){
     849           0 :                         varid = defConstant(cntxt->wlc, tpe, &cst);
     850           0 :                         if( varid >=0)
     851           0 :                                 p = pushArgument(cntxt->wlc, p, varid);
     852             :                 }
     853             :         }
     854           0 :         if( cntxt->wlc_kind < WLC_UPDATE)
     855           0 :                 cntxt->wlc_kind = WLC_UPDATE;
     856             : 
     857             :         return msg;
     858             : }
     859             : 
     860             : /* check for empty BATs first */
     861             : static str
     862           0 : WLCdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     863             : {       InstrPtr p;
     864             :         int tpe, k = 0;
     865           0 :         int bid =  stk->stk[getArg(pci,3)].val.bval;
     866             :         oid o=0, last, *ol;
     867             :         BAT *b;
     868             :         str msg = MAL_SUCCEED;
     869             : 
     870             :         (void) stk;
     871             :         (void) mb;
     872           0 :         b= BBPquickdesc(bid);
     873           0 :         if( BATcount(b) == 0)
     874             :                 return MAL_SUCCEED;
     875           0 :         if ((msg = WLCstart(cntxt, "wlr.delete")))
     876             :                 return msg;
     877           0 :         cntxt->wlc_kind = WLC_UPDATE;
     878           0 :         p = newStmt(cntxt->wlc, "wlr","delete");
     879           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     880           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
     881             : 
     882           0 :         tpe= getArgType(mb,pci,3);
     883           0 :         if (isaBatType(tpe) ){
     884           0 :                 b= BATdescriptor(bid);
     885           0 :                 if (b == NULL)
     886           0 :                         throw(MAL, "wlc.delete", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     887           0 :                 o = b->tseqbase;
     888           0 :                 last = o + BATcount(b);
     889           0 :                 if( b->ttype == TYPE_void){
     890           0 :                         for( ; o < last; o++, k++){
     891           0 :                                 if( k % 32 == 31){
     892           0 :                                         p = newStmt(cntxt->wlc, "wlr","delete");
     893           0 :                                         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     894           0 :                                         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
     895             :                                 }
     896           0 :                                 p = pushOid(cntxt->wlc,p, o);
     897             :                         }
     898             :                 } else {
     899           0 :                         BATiter bi = bat_iterator(b);
     900           0 :                         ol = (oid*) bi.base;
     901           0 :                         for( ; o < last; o++, k++, ol++){
     902           0 :                                 if( k % 32 == 31){
     903           0 :                                         p = newStmt(cntxt->wlc, "wlr","delete");
     904           0 :                                         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
     905           0 :                                         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
     906             :                                 }
     907           0 :                                 p = pushOid(cntxt->wlc,p, *ol);
     908             :                         }
     909           0 :                         bat_iterator_end(&bi);
     910             :                 }
     911           0 :                 BBPunfix(b->batCacheid);
     912             :         } else
     913           0 :                 throw(MAL,"wlc.delete","BAT expected");
     914           0 :         if( cntxt->wlc_kind < WLC_UPDATE)
     915           0 :                 cntxt->wlc_kind = WLC_UPDATE;
     916             : 
     917             :         return msg;
     918             : }
     919             : 
     920             : static str
     921           0 : WLCupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     922             : {       InstrPtr p;
     923             :         str sch,tbl,col, msg = MAL_SUCCEED;
     924             :         ValRecord cst;
     925             :         int tpe, varid;
     926             :         oid o = 0, *ol = 0;
     927             : 
     928           0 :         sch = *getArgReference_str(stk,pci,1);
     929           0 :         tbl = *getArgReference_str(stk,pci,2);
     930           0 :         col = *getArgReference_str(stk,pci,3);
     931           0 :         msg = WLCstart(cntxt, "wlr.update");
     932           0 :         if(msg)
     933             :                 return msg;
     934           0 :         cntxt->wlc_kind = WLC_UPDATE;
     935           0 :         tpe= getArgType(mb,pci,5);
     936           0 :         if (isaBatType(tpe) ){
     937             :                 BAT *b, *bval;
     938           0 :                 b= BATdescriptor(stk->stk[getArg(pci,4)].val.bval);
     939           0 :                 bval= BATdescriptor(stk->stk[getArg(pci,5)].val.bval);
     940           0 :                 if(b == NULL || bval == NULL) {
     941           0 :                         if(b)
     942           0 :                                 BBPunfix(b->batCacheid);
     943           0 :                         if(bval)
     944           0 :                                 BBPunfix(bval->batCacheid);
     945           0 :                         throw(MAL, "wlr.update", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     946             :                 }
     947           0 :                 BATiter bi = bat_iterator(b);
     948           0 :                 if( b->ttype == TYPE_void)
     949           0 :                         o = b->tseqbase;
     950             :                 else
     951           0 :                         ol = (oid*) bi.base;
     952           0 :                 BATiter bvali = bat_iterator(bval);
     953           0 :                 switch( ATOMstorage(bval->ttype)){
     954           0 :                 case TYPE_bit: updateBatch(bit,Bit); break;
     955           0 :                 case TYPE_bte: updateBatch(bte,Bte); break;
     956           0 :                 case TYPE_sht: updateBatch(sht,Sht); break;
     957           0 :                 case TYPE_int: updateBatch(int,Int); break;
     958           0 :                 case TYPE_lng: updateBatch(lng,Lng); break;
     959           0 :                 case TYPE_flt: updateBatch(flt,Flt); break;
     960           0 :                 case TYPE_dbl: updateBatch(dbl,Dbl); break;
     961             : #ifdef HAVE_HGE
     962           0 :                 case TYPE_hge: updateBatch(hge,Hge); break;
     963             : #endif
     964           0 :                 case TYPE_str:
     965             :                 {       int k=0;
     966             :                         BUN x,y;
     967           0 :                         BATloop(bval,x,y){
     968           0 :                                 p = newStmt(cntxt->wlc, "wlr","update");
     969           0 :                                 p = pushStr(cntxt->wlc, p, sch);
     970           0 :                                 p = pushStr(cntxt->wlc, p, tbl);
     971           0 :                                 p = pushStr(cntxt->wlc, p, col);
     972           0 :                                 p = pushOid(cntxt->wlc, p, (ol? *ol++ : o++));
     973           0 :                                 p = pushStr(cntxt->wlc, p , BUNtvar(bi,x));
     974             :                                 k++;
     975             :                         }
     976             :                 }
     977             :                 /* fall through */
     978             :                 default:
     979           0 :                         cntxt->wlc_kind = WLC_CATALOG;
     980             :                 }
     981           0 :                 bat_iterator_end(&bi);
     982           0 :                 bat_iterator_end(&bvali);
     983           0 :                 BBPunfix(b->batCacheid);
     984           0 :                 BBPunfix(bval->batCacheid);
     985             :         } else {
     986           0 :                 p = newStmt(cntxt->wlc, "wlr","update");
     987           0 :                 p = pushStr(cntxt->wlc, p, sch);
     988           0 :                 p = pushStr(cntxt->wlc, p, tbl);
     989           0 :                 p = pushStr(cntxt->wlc, p, col);
     990           0 :                 o = *getArgReference_oid(stk,pci,4);
     991           0 :                 p = pushOid(cntxt->wlc,p, o);
     992           0 :                 if (VALcopy(&cst, getArgReference(stk,pci,5)) != NULL){
     993           0 :                         varid = defConstant(cntxt->wlc, tpe, &cst);
     994           0 :                         if( varid >= 0)
     995           0 :                                 p = pushArgument(cntxt->wlc, p, varid);
     996             :                 }
     997             :         }
     998             : 
     999           0 :         if( cntxt->wlc_kind < WLC_UPDATE)
    1000           0 :                 cntxt->wlc_kind = WLC_UPDATE;
    1001             :         return msg;
    1002             : }
    1003             : 
    1004             : static str
    1005           0 : WLCclear_table(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1006             : {
    1007             :         InstrPtr p;
    1008             :         str msg = MAL_SUCCEED;
    1009             :         (void) stk;
    1010           0 :         msg = WLCstart(cntxt, "wlr.clear_table");
    1011           0 :         if(msg)
    1012             :                 return msg;
    1013           0 :         cntxt->wlc_kind = WLC_UPDATE;
    1014           0 :         p = newStmt(cntxt->wlc, "wlr","clear_table");
    1015           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval);
    1016           0 :         p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval);
    1017           0 :         if( cntxt->wlc_kind < WLC_UPDATE)
    1018           0 :                 cntxt->wlc_kind = WLC_UPDATE;
    1019             : 
    1020             :         return msg;
    1021             : }
    1022             : 
    1023             : str
    1024       67468 : WLCcommit(int clientid)
    1025             : {
    1026       67468 :         if( mal_clients[clientid].wlc && mal_clients[clientid].wlc->stop > 1){
    1027           0 :                 newStmt(mal_clients[clientid].wlc,"wlr","commit");
    1028           0 :                 return WLCpreparewrite( &mal_clients[clientid]);
    1029             :         }
    1030             :         return MAL_SUCCEED;
    1031             : }
    1032             : 
    1033             : static str
    1034           0 : WLCcommitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1035             : {       str msg = MAL_SUCCEED;
    1036           0 :         msg = WLCstart(cntxt, "wlr.commit");
    1037           0 :         if(msg)
    1038             :                 return msg;
    1039             :         (void) mb;
    1040             :         (void) stk;
    1041             :         (void) pci;
    1042           0 :         cntxt->wlc_kind = WLC_UPDATE;
    1043           0 :         return WLCcommit(cntxt->idx);
    1044             : }
    1045             : 
    1046             : str
    1047       12772 : WLCrollback(int clientid)
    1048             : {
    1049       12772 :         if( mal_clients[clientid].wlc){
    1050           0 :                 newStmt(mal_clients[clientid].wlc,"wlr","rollback");
    1051           0 :                 return WLCpreparewrite( &mal_clients[clientid]);
    1052             :         }
    1053             :         return MAL_SUCCEED;
    1054             : }
    1055             : 
    1056             : static str
    1057           0 : WLCrollbackCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1058             : {       str msg = MAL_SUCCEED;
    1059           0 :         msg = WLCstart(cntxt, "wlr.rollback");
    1060           0 :         if(msg)
    1061             :                 return msg;
    1062             :         (void) mb;
    1063             :         (void) stk;
    1064             :         (void) pci;
    1065           0 :         cntxt->wlc_kind = WLC_UPDATE;
    1066           0 :         return WLCrollback(cntxt->idx);
    1067             : }
    1068             : 
    1069             : #include "mel.h"
    1070             : mel_func wlc_init_funcs[] = {
    1071             :  pattern("wlc", "init", WLCinitCmd, false, "Test for running as master", noargs),
    1072             :  command("wlc", "epilogue", WLCepilogue, false, "release the resources held by the wlc module", args(1,1, arg("",void))),
    1073             :  pattern("wlc", "master", WLCmaster, false, "Activate the workload-capture-replay process", noargs),
    1074             :  pattern("wlc", "master", WLCmaster, false, "Activate the workload-capture-replay process. Use a different location for the logs.", args(0,1, arg("path",str))),
    1075             :  pattern("wlc", "stop", WLCstop, false, "Stop capturing the logs", noargs),
    1076             :  pattern("wlc", "flush", WLCflush, false, "Flush current log buffer", noargs),
    1077             :  pattern("wlc", "setbeat", WLCsetbeat, false, "Maximal delay for transaction log flushing", args(0,1, arg("duration",int))),
    1078             :  pattern("wlc", "getbeat", WLCgetbeat, false, "Maximal delay for transaction log flushing", args(1,2, arg("",str),arg("duration",int))),
    1079             :  pattern("wlc", "getclock", WLCgetclock, false, "Timestamp of last update transaction", args(1,1, arg("",str))),
    1080             :  pattern("wlc", "gettick", WLCgettick, false, "Transaction identifier of the last committed transaction", args(1,1, arg("",lng))),
    1081             :  pattern("wlc", "rollback", WLCrollbackCmd, false, "Mark the end of the work unit", noargs),
    1082             :  pattern("wlc", "commit", WLCcommitCmd, false, "Mark the end of the work unit", noargs),
    1083             :  pattern("wlc", "query", WLCquery, false, "Keep the queries for replay.", args(0,1, arg("q",str))),
    1084             :  pattern("wlc", "catalog", WLCcatalog, false, "Keep the catalog changing queries for replay. ", args(0,1, arg("q",str))),
    1085             :  pattern("wlc", "action", WLCaction, false, "Keep the database changing queries for replay. ", args(0,1, arg("q",str))),
    1086             :  pattern("wlc", "append", WLCappend, false, "Keep the insertions in the workload-capture-replay list", args(1,5, arg("",int),arg("sname",str),arg("tname",str),arg("cname",str),argany("ins",0))),
    1087             :  pattern("wlc", "update", WLCupdate, false, "Keep the update in the workload-capture-replay list", args(1,6, arg("",int),arg("sname",str),arg("tname",str),arg("cname",str),argany("tid",0),argany("val",0))),
    1088             :  pattern("wlc", "delete", WLCdelete, false, "Keep the deletions in the workload-capture-replay list", args(1,4, arg("",int),arg("sname",str),arg("tname",str),argany("b",0))),
    1089             :  pattern("wlc", "clear_table", WLCclear_table, false, "Keep the deletions in the workload-capture-replay list", args(1,3, arg("",int),arg("sname",str),arg("tname",str))),
    1090             :  pattern("wlc", "commit", WLCcommitCmd, false, "Commit the workload-capture-replay record", noargs),
    1091             :  pattern("wlc", "rollback", WLCcommitCmd, false, "Rollback the workload-capture-replay record", noargs),
    1092             :  pattern("wlc", "create_seq", WLCgeneric, false, "Catalog operation create_seq", args(0,3, arg("sname",str),arg("seqname",str),arg("action",int))),
    1093             :  pattern("wlc", "alter_seq", WLCgeneric, false, "Catalog operation alter_seq", args(0,3, arg("sname",str),arg("seqname",str),arg("val",lng))),
    1094             :  pattern("wlc", "alter_seq", WLCgeneric, false, "Catalog operation alter_seq", args(0,4, arg("sname",str),arg("seqname",str),arg("seq",ptr),batarg("val",lng))),
    1095             :  pattern("wlc", "drop_seq", WLCgeneric, false, "Catalog operation drop_seq", args(0,3, arg("sname",str),arg("nme",str),arg("action",int))),
    1096             :  pattern("wlc", "create_schema", WLCgeneric, false, "Catalog operation create_schema", args(0,3, arg("sname",str),arg("auth",str),arg("action",int))),
    1097             :  pattern("wlc", "drop_schema", WLCgeneric, false, "Catalog operation drop_schema", args(0,3, arg("sname",str),arg("ifexists",int),arg("action",int))),
    1098             :  pattern("wlc", "create_table", WLCgeneric, false, "Catalog operation create_table", args(0,3, arg("sname",str),arg("tname",str),arg("temp",int))),
    1099             :  pattern("wlc", "create_view", WLCgeneric, false, "Catalog operation create_view", args(0,4, arg("sname",str),arg("tname",str),arg("temp",int),arg("replace",int))),
    1100             :  pattern("wlc", "drop_table", WLCgeneric, false, "Catalog operation drop_table", args(0,4, arg("sname",str),arg("name",str),arg("action",int),arg("ifexists",int))),
    1101             :  pattern("wlc", "drop_view", WLCgeneric, false, "Catalog operation drop_view", args(0,4, arg("sname",str),arg("name",str),arg("action",int),arg("ifexists",int))),
    1102             :  pattern("wlc", "drop_constraint", WLCgeneric, false, "Catalog operation drop_constraint", args(0,5, arg("sname",str),arg("tname",str),arg("name",str),arg("action",int),arg("ifexists",int))),
    1103             :  pattern("wlc", "alter_table", WLCgeneric, false, "Catalog operation alter_table", args(0,3, arg("sname",str),arg("tname",str),arg("action",int))),
    1104             :  pattern("wlc", "create_type", WLCgeneric, false, "Catalog operation create_type", args(0,3, arg("sname",str),arg("nme",str),arg("impl",str))),
    1105             :  pattern("wlc", "drop_type", WLCgeneric, false, "Catalog operation drop_type", args(0,3, arg("sname",str),arg("nme",str),arg("action",int))),
    1106             :  pattern("wlc", "grant_roles", WLCgeneric, false, "Catalog operation grant_roles", args(0,4, arg("sname",str),arg("auth",str),arg("grantor",int),arg("admin",int))),
    1107             :  pattern("wlc", "revoke_roles", WLCgeneric, false, "Catalog operation revoke_roles", args(0,4, arg("sname",str),arg("auth",str),arg("grantor",int),arg("admin",int))),
    1108             :  pattern("wlc", "grant", WLCgeneric, false, "Catalog operation grant", args(0,7, arg("sname",str),arg("tbl",str),arg("grantee",str),arg("privs",int),arg("cname",str),arg("gr",int),arg("grantor",int))),
    1109             :  pattern("wlc", "revoke", WLCgeneric, false, "Catalog operation revoke", args(0,7, arg("sname",str),arg("tbl",str),arg("grantee",str),arg("privs",int),arg("cname",str),arg("grant",int),arg("grantor",int))),
    1110             :  pattern("wlc", "grant_function", WLCgeneric, false, "Catalog operation grant_function", args(0,6, arg("sname",str),arg("fcnid",int),arg("grantee",str),arg("privs",int),arg("grant",int),arg("grantor",int))),
    1111             :  pattern("wlc", "revoke_function", WLCgeneric, false, "Catalog operation revoke_function", args(0,6, arg("sname",str),arg("fcnid",int),arg("grantee",str),arg("privs",int),arg("grant",int),arg("grantor",int))),
    1112             :  pattern("wlc", "create_user", WLCgeneric, false, "Catalog operation create_user", args(0,5, arg("sname",str),arg("passwrd",str),arg("enc",int),arg("schema",str),arg("fullname",str))),
    1113             :  pattern("wlc", "drop_user", WLCgeneric, false, "Catalog operation drop_user", args(0,2, arg("sname",str),arg("action",int))),
    1114             :  pattern("wlc", "drop_user", WLCgeneric, false, "Catalog operation drop_user", args(0,3, arg("sname",str),arg("auth",str),arg("action",int))),
    1115             :  pattern("wlc", "alter_user", WLCgeneric, false, "Catalog operation alter_user", args(0,5, arg("sname",str),arg("passwrd",str),arg("enc",int),arg("schema",str),arg("oldpasswrd",str))),
    1116             :  pattern("wlc", "rename_user", WLCgeneric, false, "Catalog operation rename_user", args(0,3, arg("sname",str),arg("newnme",str),arg("action",int))),
    1117             :  pattern("wlc", "create_role", WLCgeneric, false, "Catalog operation create_role", args(0,3, arg("sname",str),arg("role",str),arg("grator",int))),
    1118             :  pattern("wlc", "drop_role", WLCgeneric, false, "Catalog operation drop_role", args(0,3, arg("auth",str),arg("role",str),arg("action",int))),
    1119             :  pattern("wlc", "drop_role", WLCgeneric, false, "Catalog operation drop_role", args(0,2, arg("role",str),arg("action",int))),
    1120             :  pattern("wlc", "drop_index", WLCgeneric, false, "Catalog operation drop_index", args(0,3, arg("sname",str),arg("iname",str),arg("action",int))),
    1121             :  pattern("wlc", "drop_function", WLCgeneric, false, "Catalog operation drop_function", args(0,5, arg("sname",str),arg("fname",str),arg("fid",int),arg("type",int),arg("action",int))),
    1122             :  pattern("wlc", "create_function", WLCgeneric, false, "Catalog operation create_function", args(0,3, arg("sname",str),arg("fname",str),arg("replace",int))),
    1123             :  pattern("wlc", "create_trigger", WLCgeneric, false, "Catalog operation create_trigger", args(0,11, arg("sname",str),arg("tname",str),arg("triggername",str),arg("time",int),arg("orientation",int),arg("event",int),arg("old",str),arg("new",str),arg("cond",str),arg("qry",str),arg("replace",int))),
    1124             :  pattern("wlc", "drop_trigger", WLCgeneric, false, "Catalog operation drop_trigger", args(0,3, arg("sname",str),arg("nme",str),arg("ifexists",int))),
    1125             :  pattern("wlc", "alter_add_table", WLCgeneric, false, "Catalog operation alter_add_table", args(0,5, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("action",int))),
    1126             :  pattern("wlc", "alter_del_table", WLCgeneric, false, "Catalog operation alter_del_table", args(0,5, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("action",int))),
    1127             :  pattern("wlc", "alter_set_table", WLCgeneric, false, "Catalog operation alter_set_table", args(0,3, arg("sname",str),arg("tnme",str),arg("access",int))),
    1128             :  pattern("wlc", "alter_add_range_partition", WLCgeneric, false, "Catalog operation alter_add_range_partition", args(0,8, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("min",str),arg("max",str),arg("nills",bit),arg("update",int))),
    1129             :  pattern("wlc", "comment_on", WLCgeneric, false, "Catalog operation comment_on", args(0,2, arg("objid",int),arg("remark",str))),
    1130             :  pattern("wlc", "rename_schema", WLCgeneric, false, "Catalog operation rename_schema", args(0,2, arg("sname",str),arg("newnme",str))),
    1131             :  pattern("wlc", "rename_table", WLCgeneric, false, "Catalog operation rename_table", args(0,4, arg("osname",str),arg("nsname",str),arg("otname",str),arg("ntname",str))),
    1132             :  pattern("wlc", "rename_column", WLCgeneric, false, "Catalog operation rename_column", args(0,4, arg("sname",str),arg("tname",str),arg("cname",str),arg("newnme",str))),
    1133             :  pattern("wlc", "transaction_release", WLCgeneric, false, "A transaction statement (type can be commit,release,rollback or start)", args(1,3, arg("",void),arg("chain",int),arg("name",str))),
    1134             :  pattern("wlc", "transaction_commit", WLCgeneric, false, "A transaction statement (type can be commit,release,rollback or start)", args(1,3, arg("",void),arg("chain",int),arg("name",str))),
    1135             :  pattern("wlc", "transaction_rollback", WLCgeneric, false, "A transaction statement (type can be commit,release,rollback or start)", args(1,3, arg("",void),arg("chain",int),arg("name",str))),
    1136             :  pattern("wlc", "transaction_begin", WLCgeneric, false, "A transaction statement (type can be commit,release,rollback or start)", args(1,3, arg("",void),arg("chain",int),arg("name",str))),
    1137             :  pattern("wlc", "transaction", WLCgeneric, true, "Start an autocommit transaction", noargs),
    1138             :  pattern("wlc", "alter_add_value_partition", WLCgeneric, false, "Catalog operation alter_add_value_partition", args(0,6, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("nills",bit),arg("update",int))),
    1139             :  pattern("wlc", "alter_add_value_partition", WLCgeneric, false, "Catalog operation alter_add_value_partition", args(0,7, arg("sname",str),arg("mtnme",str),arg("psnme",str),arg("ptnme",str),arg("nills",bit),arg("update",int),vararg("arg",str))),
    1140             :  { .imp=NULL }
    1141             : };
    1142             : #include "mal_import.h"
    1143             : #ifdef _MSC_VER
    1144             : #undef read
    1145             : #pragma section(".CRT$XCU",read)
    1146             : #endif
    1147         259 : LIB_STARTUP_FUNC(init_wlc_mal)
    1148         259 : { mal_module("wlc", NULL, wlc_init_funcs); }

Generated by: LCOV version 1.14