LCOV - code coverage report
Current view: top level - gdk - gdk_logger.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1061 1488 71.3 %
Date: 2021-09-14 19:48:19 Functions: 67 72 93.1 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "gdk.h"
      11             : #include "gdk_private.h"
      12             : #include "gdk_logger.h"
      13             : #include "gdk_logger_internals.h"
      14             : #include "mutils.h"
      15             : #include <string.h>
      16             : 
      17             : static gdk_return logger_add_bat(logger *lg, BAT *b, log_id id);
      18             : static gdk_return logger_del_bat(logger *lg, log_bid bid);
      19             : /*
      20             :  * The logger uses a directory to store its log files. One master log
      21             :  * file stores information about the version of the logger and the
      22             :  * type mapping it uses. This file is a simple ascii file with the
      23             :  * following format:
      24             :  *  {6DIGIT-VERSION\n[id,type_name\n]*}
      25             :  * The transaction log files have a binary format.
      26             :  */
      27             : 
      28             : #define LOG_START       0
      29             : #define LOG_END         1
      30             : #define LOG_UPDATE_CONST        2
      31             : #define LOG_UPDATE_BULK 3
      32             : #define LOG_UPDATE      4
      33             : #define LOG_CREATE      5
      34             : #define LOG_DESTROY     6
      35             : #define LOG_SEQ         7
      36             : #define LOG_CLEAR       8
      37             : #define LOG_ROW         9 /* per row relative small log entry */
      38             : 
      39             : #ifdef NATIVE_WIN32
      40             : #define getfilepos _ftelli64
      41             : #else
      42             : #ifdef HAVE_FSEEKO
      43             : #define getfilepos ftello
      44             : #else
      45             : #define getfilepos ftell
      46             : #endif
      47             : #endif
      48             : 
      49             : #define BATSIZE 0
      50             : 
      51             : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
      52             : 
      53             : static const char *log_commands[] = {
      54             :         "LOG_START",
      55             :         "LOG_END",
      56             :         "LOG_UPDATE_CONST",
      57             :         "LOG_UPDATE_BULK",
      58             :         "LOG_UPDATE",
      59             :         "LOG_CREATE",
      60             :         "LOG_DESTROY",
      61             :         "LOG_SEQ",
      62             :         "LOG_CLEAR",
      63             :         "LOG_ROW",
      64             : };
      65             : 
      66             : typedef struct logaction {
      67             :         int type;               /* type of change */
      68             :         lng nr;
      69             :         int tt;
      70             :         lng id;
      71             :         lng offset;
      72             :         log_id cid;             /* id of object */
      73             :         BAT *b;                 /* temporary bat with changes */
      74             :         BAT *uid;               /* temporary bat with bun positions to update */
      75             : } logaction;
      76             : 
      77             : /* during the recover process a number of transactions could be active */
      78             : typedef struct trans {
      79             :         int tid;                /* transaction id */
      80             :         int sz;                 /* sz of the changes array */
      81             :         int nr;                 /* nr of changes */
      82             : 
      83             :         logaction *changes;
      84             : 
      85             :         struct trans *tr;
      86             : } trans;
      87             : 
      88             : typedef struct logformat_t {
      89             :         bte flag;
      90             :         int id;
      91             : } logformat;
      92             : 
      93             : typedef enum {LOG_OK, LOG_EOF, LOG_ERR} log_return;
      94             : 
      95             : static gdk_return bm_commit(logger *lg);
      96             : static gdk_return tr_grow(trans *tr);
      97             : 
      98             : static inline void
      99             : logger_lock(logger *lg)
     100             : {
     101       52767 :         MT_lock_set(&lg->lock);
     102             : }
     103             : 
     104             : static inline void
     105             : logger_unlock(logger *lg)
     106             : {
     107      331473 :         MT_lock_unset(&lg->lock);
     108       35424 : }
     109             : 
     110             : static bte
     111      382902 : find_type(logger *lg, int tpe)
     112             : {
     113      382902 :         BATiter cni = bat_iterator_nolock(lg->type_nr);
     114      382902 :         bte *res = (bte*)Tloc(lg->type_id, 0);
     115             :         BUN p;
     116             : 
     117             :         /* type should be there !*/
     118      382902 :         if (BAThash(lg->type_nr) == GDK_SUCCEED) {
     119      382902 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     120      382902 :                 HASHloop_int(cni, cni.b->thash, p, &tpe) {
     121      382735 :                         MT_rwlock_rdunlock(&cni.b->thashlock);
     122      382735 :                         return res[p];
     123             :                 }
     124         167 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     125             :         }
     126             :         return -1;
     127             : }
     128             : 
     129             : static int
     130       97583 : find_type_nr(logger *lg, bte tpe)
     131             : {
     132       97583 :         BATiter cni = bat_iterator_nolock(lg->type_id);
     133       97583 :         int *res = (int*)Tloc(lg->type_nr, 0);
     134             :         BUN p;
     135             : 
     136             :         /* type should be there !*/
     137       97583 :         if (BAThash(lg->type_id) == GDK_SUCCEED) {
     138       97583 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     139       97583 :                 HASHloop_bte(cni, cni.b->thash, p, &tpe) {
     140       97583 :                         MT_rwlock_rdunlock(&cni.b->thashlock);
     141       97583 :                         return res[p];
     142             :                 }
     143           0 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     144             :         }
     145             :         return -1;
     146             : }
     147             : 
     148             : static BUN
     149       97198 : log_find(BAT *b, BAT *d, int val)
     150             : {
     151             :         BUN p;
     152             : 
     153       97198 :         assert(b->ttype == TYPE_int);
     154       97198 :         assert(d->ttype == TYPE_oid);
     155       97198 :         if (BAThash(b) == GDK_SUCCEED) {
     156       97198 :                 BATiter cni = bat_iterator_nolock(b);
     157       97198 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     158      151834 :                 HASHloop_int(cni, cni.b->thash, p, &val) {
     159       28571 :                         oid pos = p;
     160       28571 :                         if (BUNfnd(d, &pos) == BUN_NONE) {
     161       28571 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     162       28571 :                                 return p;
     163             :                         }
     164             :                 }
     165       68627 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     166             :         } else {                /* unlikely: BAThash failed */
     167             :                 BUN q;
     168           0 :                 int *t = (int *) Tloc(b, 0);
     169             : 
     170           0 :                 for (p = 0, q = BUNlast(b); p < q; p++) {
     171           0 :                         if (t[p] == val) {
     172           0 :                                 oid pos = p;
     173           0 :                                 if (BUNfnd(d, &pos) == BUN_NONE)
     174           0 :                                         return p;
     175             :                         }
     176             :                 }
     177             :         }
     178             :         return BUN_NONE;
     179             : }
     180             : 
     181             : static log_bid
     182      202619 : internal_find_bat(logger *lg, log_id id)
     183             : {
     184      202619 :         BATiter cni = bat_iterator_nolock(lg->catalog_id);
     185             :         BUN p;
     186             : 
     187      202619 :         if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
     188      202619 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     189      331343 :                 HASHloop_int(cni, cni.b->thash, p, &id) {
     190      135238 :                         oid pos = p;
     191      135238 :                         if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
     192      128218 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     193      128218 :                                 return *(log_bid *) Tloc(lg->catalog_bid, p);
     194             :                         }
     195             :                 }
     196       74401 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     197             :         }
     198             :         return 0;
     199             : }
     200             : 
     201             : static void
     202             : logbat_destroy(BAT *b)
     203             : {
     204       17979 :         if (b)
     205      103859 :                 BBPunfix(b->batCacheid);
     206        5082 : }
     207             : 
     208             : static BAT *
     209       18589 : logbat_new(int tt, BUN size, role_t role)
     210             : {
     211       18589 :         BAT *nb = COLnew(0, tt, size, role);
     212             : 
     213       18589 :         if (nb) {
     214       18589 :                 BBP_pid(nb->batCacheid) = 0;
     215       18589 :                 if (role == PERSISTENT)
     216       12593 :                         BATmode(nb, false);
     217             :         } else {
     218           0 :                 TRC_CRITICAL(GDK, "creating new BAT[void:%s]#" BUNFMT " failed\n", ATOMname(tt), size);
     219             :         }
     220       18589 :         return nb;
     221             : }
     222             : 
     223             : static int
     224      142196 : log_read_format(logger *l, logformat *data)
     225             : {
     226      142196 :         assert(!l->inmemory);
     227      273176 :         return mnstr_read(l->input_log, &data->flag, 1, 1) == 1 &&
     228      130980 :                 mnstr_readInt(l->input_log, &data->id) == 1;
     229             : }
     230             : 
     231             : static gdk_return
     232      141153 : log_write_format(logger *lg, logformat *data)
     233             : {
     234      141153 :         assert(data->id || data->flag);
     235      141153 :         assert(!lg->inmemory);
     236      282306 :         if (mnstr_write(lg->output_log, &data->flag, 1, 1) == 1 &&
     237      141153 :             mnstr_writeInt(lg->output_log, data->id))
     238             :                 return GDK_SUCCEED;
     239           0 :         TRC_CRITICAL(GDK, "write failed\n");
     240           0 :         return GDK_FAIL;
     241             : }
     242             : 
     243             : static log_return
     244           0 : log_read_clear(logger *lg, trans *tr, log_id id)
     245             : {
     246           0 :         if (lg->debug & 1)
     247           0 :                 fprintf(stderr, "#logger found log_read_clear %d\n", id);
     248           0 :         if (tr_grow(tr) != GDK_SUCCEED)
     249             :                 return LOG_ERR;
     250           0 :         tr->changes[tr->nr].type = LOG_CLEAR;
     251           0 :         tr->changes[tr->nr].cid = id;
     252           0 :         tr->nr++;
     253           0 :         return LOG_OK;
     254             : }
     255             : 
     256             : static gdk_return
     257           0 : la_bat_clear(logger *lg, logaction *la)
     258             : {
     259           0 :         log_bid bid = internal_find_bat(lg, la->cid);
     260             :         BAT *b;
     261             : 
     262           0 :         if (lg->debug & 1)
     263           0 :                 fprintf(stderr, "#la_bat_clear %d\n", la->cid);
     264             : 
     265           0 :         b = BATdescriptor(bid);
     266           0 :         if (b) {
     267           0 :                 restrict_t access = (restrict_t) b->batRestricted;
     268           0 :                 b->batRestricted = BAT_WRITE;
     269             :                 /* during startup this is fine */
     270           0 :                 BATclear(b, true);
     271           0 :                 b->batRestricted = access;
     272             :                 logbat_destroy(b);
     273             :         }
     274           0 :         return GDK_SUCCEED;
     275             : }
     276             : 
     277             : static log_return
     278        2519 : log_read_seq(logger *lg, logformat *l)
     279             : {
     280        2519 :         int seq = l->id;
     281             :         lng val;
     282             :         BUN p;
     283             : 
     284        2519 :         assert(!lg->inmemory);
     285        2519 :         if (!mnstr_readLng(lg->input_log, &val)) {
     286           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     287           0 :                 return LOG_EOF;
     288             :         }
     289             : 
     290        2519 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
     291        2518 :             p >= lg->seqs_id->batInserted) {
     292        1097 :                 assert(lg->seqs_val->hseqbase == 0);
     293        1097 :                 if (BUNreplace(lg->seqs_val, p, &val, false) != GDK_SUCCEED)
     294           0 :                         return LOG_ERR;
     295             :         } else {
     296        1422 :                 if (p != BUN_NONE) {
     297        1421 :                         oid pos = p;
     298        1421 :                         if (BUNappend(lg->dseqs, &pos, false) != GDK_SUCCEED)
     299           0 :                                 return LOG_ERR;
     300             :                 }
     301        2844 :                 if (BUNappend(lg->seqs_id, &seq, false) != GDK_SUCCEED ||
     302        1422 :                     BUNappend(lg->seqs_val, &val, false) != GDK_SUCCEED)
     303           0 :                         return LOG_ERR;
     304             :         }
     305             :         return LOG_OK;
     306             : }
     307             : 
     308             : #if 0
     309             : static gdk_return
     310             : log_write_id(logger *lg, int id)
     311             : {
     312             :         assert(!lg->inmemory);
     313             :         assert(id >= 0);
     314             :         if (mnstr_writeInt(lg->output_log, id))
     315             :                 return GDK_SUCCEED;
     316             :         TRC_CRITICAL(GDK, "write failed\n");
     317             :         return GDK_FAIL;
     318             : }
     319             : 
     320             : static int
     321             : log_read_id(logger *lg, log_id *id)
     322             : {
     323             :         assert(!lg->inmemory);
     324             :         if (mnstr_readInt(lg->input_log, id) != 1) {
     325             :                 TRC_CRITICAL(GDK, "read failed\n");
     326             :                 return LOG_EOF;
     327             :         }
     328             :         return LOG_OK;
     329             : }
     330             : #endif
     331             : 
     332             : static log_return
     333       12607 : string_reader(logger *lg, BAT *b, lng nr)
     334             : {
     335             :         size_t sz = 0;
     336       12607 :         lng SZ = 0;
     337             :         log_return res = LOG_OK;
     338             : 
     339       25264 :         for (; nr && res == LOG_OK; ) {
     340       12657 :                 if (mnstr_readLng(lg->input_log, &SZ) != 1)
     341           0 :                         return LOG_EOF;
     342       12657 :                 sz = (size_t)SZ;
     343       12657 :                 char *buf = lg->buf;
     344       12657 :                 if (lg->bufsize < sz) {
     345           0 :                         if (!(buf = GDKrealloc(lg->buf, sz)))
     346             :                                 return LOG_ERR;
     347           0 :                         lg->buf = buf;
     348           0 :                         lg->bufsize = sz;
     349             :                 }
     350             : 
     351       12657 :                 if (mnstr_read(lg->input_log, buf, sz, 1) != 1)
     352             :                         return LOG_EOF;
     353             :                 /* handle strings */
     354             :                 char *t = buf;
     355             :                 /* chunked */
     356             : #define CHUNK_SIZE 1024
     357             :                 char *strings[CHUNK_SIZE];
     358             :                 int cur = 0;
     359             : 
     360       55621 :                 for(; nr>0 && res == LOG_OK && t < (buf+sz); nr--) {
     361       42964 :                         strings[cur++] = t;
     362       42964 :                         if (cur == CHUNK_SIZE && b && BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED)
     363             :                                 res = LOG_ERR;
     364       42964 :                         if (cur == CHUNK_SIZE)
     365             :                                 cur = 0;
     366             :                         /* find next */
     367     5123892 :                         while(*t)
     368     5080928 :                                 t++;
     369       42964 :                         t++;
     370             :                 }
     371       12657 :                 if (cur && b && BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED)
     372             :                         res = LOG_ERR;
     373             :         }
     374             :         return res;
     375             : }
     376             : 
     377             : static log_return
     378       94570 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, lng offset)
     379             : {
     380             :         log_return res = LOG_OK;
     381             :         lng nr, pnr;
     382       94570 :         bte type_id = -1;
     383             :         int tpe;
     384             : 
     385       94570 :         assert(!lg->inmemory);
     386       94570 :         if (lg->debug & 1)
     387           0 :                 fprintf(stderr, "#logger found log_read_updates %d %s\n", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
     388             : 
     389      189140 :         if (!mnstr_readLng(lg->input_log, &nr) ||
     390       94570 :             mnstr_read(lg->input_log, &type_id, 1, 1) != 1)
     391           0 :                 return LOG_ERR;
     392             : 
     393       94570 :         pnr = nr;
     394       94570 :         tpe = find_type_nr(lg, type_id);
     395       94570 :         if (tpe >= 0) {
     396             :                 BAT *uid = NULL;
     397             :                 BAT *r = NULL;
     398       94570 :                 void *(*rt) (ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
     399             : 
     400       94570 :                 assert(nr <= (lng) BUN_MAX);
     401       94570 :                 if (!lg->flushing && l->flag == LOG_UPDATE) {
     402           8 :                         uid = COLnew(0, TYPE_oid, (BUN)nr, PERSISTENT);
     403           8 :                         if (uid == NULL) {
     404             :                                 return LOG_ERR;
     405             :                         }
     406             :                 }
     407       94570 :                 if (!lg->flushing) {
     408        2537 :                         r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
     409        2537 :                         if (r == NULL) {
     410           0 :                                 if (uid)
     411           0 :                                         BBPreclaim(uid);
     412           0 :                                 return LOG_ERR;
     413             :                         }
     414             :                 }
     415             : 
     416       94570 :                 if (l->flag == LOG_UPDATE_CONST) {
     417       49626 :                         if (!mnstr_readLng(lg->input_log, &offset)) {
     418           0 :                                 if (r)
     419           0 :                                         BBPreclaim(r);
     420           0 :                                 return LOG_ERR;
     421             :                         }
     422       49626 :                         size_t tlen = lg->bufsize;
     423       49626 :                         void *t = rt(lg->buf, &tlen, lg->input_log, 1);
     424       49626 :                         if (t == NULL) {
     425             :                                 res = LOG_ERR;
     426             :                         } else {
     427       49626 :                                 lg->buf = t;
     428       49626 :                                 lg->bufsize = tlen;
     429      113791 :                                 for(BUN p = 0; p<(BUN) nr; p++) {
     430       64165 :                                         if (r && BUNappend(r, t, true) != GDK_SUCCEED)
     431             :                                                 res = LOG_ERR;
     432             :                                 }
     433             :                         }
     434       44944 :                 } else if (l->flag == LOG_UPDATE_BULK) {
     435       44919 :                         if (!mnstr_readLng(lg->input_log, &offset)) {
     436           0 :                                 if (r)
     437           0 :                                         BBPreclaim(r);
     438           0 :                                 return LOG_ERR;
     439             :                         }
     440       44919 :                         if (tpe == TYPE_msk) {
     441           0 :                                 if (r) {
     442           0 :                                         if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
     443           0 :                                                 BATsetcount(r, (BUN) nr);
     444             :                                         else
     445             :                                                 res = LOG_ERR;
     446             :                                 } else {
     447           0 :                                         size_t tlen = lg->bufsize/sizeof(int);
     448           0 :                                         size_t cnt = 0, snr = (size_t)nr;
     449           0 :                                         snr = (snr+31)/32;
     450           0 :                                         assert(tlen);
     451           0 :                                         for (; res == LOG_OK && snr > 0; snr-=cnt) {
     452           0 :                                                 cnt = snr>tlen?tlen:snr;
     453           0 :                                                 if (!mnstr_readIntArray(lg->input_log, lg->buf, cnt))
     454             :                                                         res = LOG_ERR;
     455             :                                         }
     456             :                                 }
     457             :                         } else {
     458       44919 :                                 if (!ATOMvarsized(tpe)) {
     459       31775 :                                         size_t cnt = 0, snr = (size_t)nr;
     460       31775 :                                         size_t tlen = lg->bufsize/ATOMsize(tpe), ntlen = lg->bufsize;
     461       31775 :                                         assert(tlen);
     462             :                                         /* read in chunks of max
     463             :                                          * BUFSIZE/width rows */
     464       76215 :                                         for (; res == LOG_OK && snr > 0; snr-=cnt) {
     465       44440 :                                                 cnt = snr>tlen?tlen:snr;
     466       44440 :                                                 void *t = rt(lg->buf, &ntlen, lg->input_log, cnt);
     467             : 
     468       44440 :                                                 if (t == NULL) {
     469             :                                                         res = LOG_EOF;
     470             :                                                         break;
     471             :                                                 }
     472       44440 :                                                 assert(t == lg->buf);
     473       44440 :                                                 if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED)
     474             :                                                         res = LOG_ERR;
     475             :                                         }
     476       13144 :                                 } else if (tpe == TYPE_str) {
     477             :                                         /* efficient string */
     478       12602 :                                         res = string_reader(lg, r, nr);
     479             :                                 } else {
     480        1137 :                                         for (; res == LOG_OK && nr > 0; nr--) {
     481         595 :                                                 size_t tlen = lg->bufsize;
     482         595 :                                                 void *t = rt(lg->buf, &tlen, lg->input_log, 1);
     483             : 
     484         595 :                                                 if (t == NULL) {
     485             :                                                         /* see if failure was due to
     486             :                                                         * malloc or something less
     487             :                                                         * serious (in the current
     488             :                                                         * context) */
     489           0 :                                                         if (strstr(GDKerrbuf, "alloc") == NULL)
     490             :                                                                 res = LOG_EOF;
     491             :                                                         else
     492             :                                                                 res = LOG_ERR;
     493             :                                                 } else {
     494         595 :                                                         lg->buf = t;
     495         595 :                                                         lg->bufsize = tlen;
     496         595 :                                                         if (r && BUNappend(r, t, true) != GDK_SUCCEED)
     497             :                                                                 res = LOG_ERR;
     498             :                                                 }
     499             :                                         }
     500             :                                 }
     501             :                         }
     502             :                 } else {
     503          25 :                         void *(*rh) (ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
     504          25 :                         void *hv = ATOMnil(TYPE_oid);
     505             : 
     506          25 :                         if (hv == NULL)
     507             :                                 res = LOG_ERR;
     508         106 :                         for (; res == LOG_OK && nr > 0; nr--) {
     509          81 :                                 size_t hlen = sizeof(oid);
     510          81 :                                 void *h = rh(hv, &hlen, lg->input_log, 1);
     511          81 :                                 assert(hlen == sizeof(oid));
     512          81 :                                 assert(h == hv);
     513          81 :                                 if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED))
     514             :                                         res = LOG_ERR;
     515             :                         }
     516          25 :                         nr = pnr;
     517          25 :                         if (tpe == TYPE_msk) {
     518           0 :                                 if (r) {
     519           0 :                                         if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
     520           0 :                                                 BATsetcount(r, (BUN) nr);
     521             :                                         else
     522             :                                                 res = LOG_ERR;
     523             :                                 } else {
     524           0 :                                         for (lng i = 0; i < nr; i += 32) {
     525             :                                                 int v;
     526           0 :                                                 switch (mnstr_readInt(lg->input_log, &v)) {
     527           0 :                                                 case 1:
     528           0 :                                                         continue;
     529             :                                                 case 0:
     530             :                                                         res = LOG_EOF;
     531             :                                                         break;
     532           0 :                                                 default:
     533             :                                                         res = LOG_ERR;
     534           0 :                                                         break;
     535             :                                                 }
     536           0 :                                                 break;
     537             :                                         }
     538             :                                 }
     539          25 :                         } else if (tpe == TYPE_str) {
     540             :                                 /* efficient string */
     541           5 :                                 res = string_reader(lg, r, nr);
     542             :                         } else {
     543          90 :                                 for (; res == LOG_OK && nr > 0; nr--) {
     544          70 :                                         size_t tlen = lg->bufsize;
     545          70 :                                         void *t = rt(lg->buf, &tlen, lg->input_log, 1);
     546             : 
     547          70 :                                         if (t == NULL) {
     548           0 :                                                 if (strstr(GDKerrbuf, "malloc") == NULL)
     549             :                                                         res = LOG_EOF;
     550             :                                                 else
     551             :                                                         res = LOG_ERR;
     552             :                                         } else {
     553          70 :                                                 lg->buf = t;
     554          70 :                                                 lg->bufsize = tlen;
     555          70 :                                                 if ((r && BUNappend(r, t, true) != GDK_SUCCEED))
     556             :                                                         res = LOG_ERR;
     557             :                                         }
     558             :                                 }
     559             :                         }
     560          25 :                         GDKfree(hv);
     561             :                 }
     562             : 
     563       94570 :                 if (res == LOG_OK) {
     564       94570 :                         if (tr_grow(tr) == GDK_SUCCEED) {
     565       94570 :                                 tr->changes[tr->nr].type =
     566       94570 :                                         l->flag==LOG_UPDATE_CONST?LOG_UPDATE_BULK:l->flag;
     567       94570 :                                 tr->changes[tr->nr].nr = pnr;
     568       94570 :                                 tr->changes[tr->nr].tt = tpe;
     569       94570 :                                 tr->changes[tr->nr].cid = id;
     570       94570 :                                 tr->changes[tr->nr].offset = offset;
     571       94570 :                                 tr->changes[tr->nr].b = r;
     572       94570 :                                 tr->changes[tr->nr].uid = uid;
     573       94570 :                                 tr->nr++;
     574             :                         } else {
     575             :                                 res = LOG_ERR;
     576             :                         }
     577             :                 }
     578       94570 :                 if (res == LOG_ERR) {
     579           0 :                         if (r)
     580           0 :                                 BBPreclaim(r);
     581           0 :                         if (uid)
     582           0 :                                 BBPreclaim(uid);
     583             :                 }
     584             :         } else {
     585             :                 /* bat missing ERROR or ignore ? currently error. */
     586             :                 res = LOG_ERR;
     587             :         }
     588             :         return res;
     589             : }
     590             : 
     591             : 
     592             : static gdk_return
     593      242944 : la_bat_update_count(logger *lg, log_id id, lng cnt)
     594             : {
     595      242944 :         BATiter cni = bat_iterator_nolock(lg->catalog_id);
     596             :         BUN p;
     597             : 
     598      242944 :         if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
     599      242944 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     600      629543 :                 HASHloop_int(cni, cni.b->thash, p, &id) {
     601      243355 :                         lng ocnt = *(lng*) Tloc(lg->catalog_cnt, p);
     602      243355 :                         assert(lg->catalog_cnt->hseqbase == 0);
     603      243355 :                         if (ocnt < cnt && BUNreplace(lg->catalog_cnt, p, &cnt, false) != GDK_SUCCEED) {
     604           0 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     605           0 :                                 return GDK_FAIL;
     606             :                         }
     607             :                 }
     608      242944 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     609             :         }
     610             :         return GDK_SUCCEED;
     611             : 
     612             : }
     613             : 
     614             : static gdk_return
     615       94570 : la_bat_updates(logger *lg, logaction *la)
     616             : {
     617       94570 :         log_bid bid = internal_find_bat(lg, la->cid);
     618             :         BAT *b = NULL;
     619             : 
     620       94570 :         if (bid == 0)
     621             :                 return GDK_SUCCEED; /* ignore bats no longer in the catalog */
     622             : 
     623       87784 :         if (!lg->flushing) {
     624        2537 :                 b = BATdescriptor(bid);
     625        2537 :                 if (b == NULL)
     626             :                         return GDK_FAIL;
     627             :         }
     628       87784 :         if (la->type == LOG_UPDATE_BULK) {
     629             :                 BUN cnt = 0;
     630             : 
     631       87767 :                 if (!lg->flushing) {
     632        2529 :                         cnt = BATcount(b);
     633        2529 :                         int is_msk = (b->ttype == TYPE_msk);
     634             :                         /* handle offset 0 ie clear */
     635             :                         if (/* DISABLES CODE */ (0) && la->offset == 0 && cnt)
     636             :                                 BATclear(b, true);
     637             :                         /* handle offset */
     638        2529 :                         if (cnt <= (BUN)la->offset) {
     639         714 :                                 msk t = 1;
     640         714 :                                 if (cnt < (BUN)la->offset) { /* insert nils */
     641           0 :                                         const void *tv = (is_msk)?&t:ATOMnilptr(b->ttype);
     642           0 :                                         lng i, d = la->offset - BATcount(b);
     643           0 :                                         for(i=0;i<d;i++) {
     644           0 :                                                 if (BUNappend(b, tv, true) != GDK_SUCCEED) {
     645             :                                                         logbat_destroy(b);
     646           0 :                                                         return GDK_FAIL;
     647             :                                                 }
     648             :                                         }
     649             :                                 }
     650         714 :                                 if (BATcount(b) == (BUN)la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
     651             :                                         logbat_destroy(b);
     652           0 :                                         return GDK_FAIL;
     653             :                                 }
     654             :                         } else {
     655        1815 :                                 BATiter vi = bat_iterator(la->b);
     656             :                                 BUN p, q;
     657             : 
     658        7401 :                                 for (p=0, q = (BUN)la->offset; p<(BUN)la->nr; p++, q++) {
     659        5586 :                                         const void *t = BUNtail(vi, p);
     660             : 
     661        5586 :                                         if (q < cnt) {
     662        5586 :                                                 if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
     663             :                                                         logbat_destroy(b);
     664           0 :                                                         bat_iterator_end(&vi);
     665           0 :                                                         return GDK_FAIL;
     666             :                                                 }
     667             :                                         } else {
     668           0 :                                                 if (BUNappend(b, t, true) != GDK_SUCCEED) {
     669             :                                                         logbat_destroy(b);
     670           0 :                                                         bat_iterator_end(&vi);
     671           0 :                                                         return GDK_FAIL;
     672             :                                                 }
     673             :                                         }
     674             :                                 }
     675        1815 :                                 bat_iterator_end(&vi);
     676             :                         }
     677             :                 }
     678       87767 :                 cnt = (BUN)(la->offset + la->nr);
     679       87767 :                 if (la_bat_update_count(lg, la->cid, cnt) != GDK_SUCCEED) {
     680           0 :                         if (b)
     681             :                                 logbat_destroy(b);
     682           0 :                         return GDK_FAIL;
     683             :                 }
     684          17 :         } else if (!lg->flushing && la->type == LOG_UPDATE) {
     685           8 :                 BATiter vi = bat_iterator(la->b);
     686             :                 BUN p, q;
     687             : 
     688          32 :                 BATloop(la->b, p, q) {
     689          24 :                         oid h = BUNtoid(la->uid, p);
     690          24 :                         const void *t = BUNtail(vi, p);
     691             : 
     692          24 :                         if (BUNreplace(b, h, t, true) != GDK_SUCCEED) {
     693             :                                 logbat_destroy(b);
     694           0 :                                 bat_iterator_end(&vi);
     695           0 :                                 return GDK_FAIL;
     696             :                         }
     697             :                 }
     698           8 :                 bat_iterator_end(&vi);
     699             :         }
     700       87784 :         if (b)
     701             :                 logbat_destroy(b);
     702             :         return GDK_SUCCEED;
     703             : }
     704             : 
     705             : static log_return
     706        9064 : log_read_destroy(logger *lg, trans *tr, log_id id)
     707             : {
     708             :         (void) lg;
     709        9064 :         assert(!lg->inmemory);
     710        9064 :         if (tr_grow(tr) == GDK_SUCCEED) {
     711        9064 :                 tr->changes[tr->nr].type = LOG_DESTROY;
     712        9064 :                 tr->changes[tr->nr].cid = id;
     713        9064 :                 tr->nr++;
     714             :         }
     715        9064 :         return LOG_OK;
     716             : }
     717             : 
     718             : static gdk_return
     719          36 : la_bat_destroy(logger *lg, logaction *la)
     720             : {
     721          36 :         log_bid bid = internal_find_bat(lg, la->cid);
     722             : 
     723          36 :         if (bid && logger_del_bat(lg, bid) != GDK_SUCCEED)
     724           0 :                 return GDK_FAIL;
     725             :         return GDK_SUCCEED;
     726             : }
     727             : 
     728             : static log_return
     729        3013 : log_read_create(logger *lg, trans *tr, log_id id)
     730             : {
     731             :         bte tt;
     732             :         int tpe;
     733             : 
     734        3013 :         assert(!lg->inmemory);
     735        3013 :         if (lg->debug & 1)
     736           0 :                 fprintf(stderr, "#log_read_create %d\n", id);
     737             : 
     738        3013 :         if (mnstr_read(lg->input_log, &tt, 1, 1) != 1)
     739             :                 return LOG_ERR;
     740             : 
     741        3013 :         tpe = find_type_nr(lg, tt);
     742             :         /* read create */
     743        3013 :         if (tr_grow(tr) == GDK_SUCCEED) {
     744        3013 :                 tr->changes[tr->nr].type = LOG_CREATE;
     745        3013 :                 tr->changes[tr->nr].tt = tpe;
     746        3013 :                 tr->changes[tr->nr].cid = id;
     747        3013 :                 tr->nr++;
     748             :         }
     749             : 
     750             :         return LOG_OK;
     751             : }
     752             : 
     753             : static gdk_return
     754         232 : la_bat_create(logger *lg, logaction *la)
     755             : {
     756             :         BAT *b;
     757             : 
     758             :         /* formerly head column type, should be void */
     759         232 :         if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
     760             :                 return GDK_FAIL;
     761             : 
     762         232 :         if (la->tt < 0)
     763           0 :                 BATtseqbase(b, 0);
     764             : 
     765         464 :         if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
     766         232 :             logger_add_bat(lg, b, la->cid) != GDK_SUCCEED) {
     767             :                 logbat_destroy(b);
     768           0 :                 return GDK_FAIL;
     769             :         }
     770             :         logbat_destroy(b);
     771         232 :         return GDK_SUCCEED;
     772             : }
     773             : 
     774             : static gdk_return
     775         195 : logger_write_new_types(logger *lg, FILE *fp)
     776             : {
     777         195 :         bte id = 0;
     778             : 
     779             :         /* write types and insert into bats */
     780             :         /* first the fixed sized types */
     781        6237 :         for (int i=0;i<GDKatomcnt; i++) {
     782        6042 :                 if (ATOMvarsized(i))
     783        1558 :                         continue;
     784        8968 :                 if (BUNappend(lg->type_id, &id, false) != GDK_SUCCEED ||
     785        8968 :                     BUNappend(lg->type_nme, BATatoms[i].name, false) != GDK_SUCCEED ||
     786        8968 :                     BUNappend(lg->type_nr, &i, false) != GDK_SUCCEED ||
     787        4484 :                     fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0) {
     788           0 :                         return GDK_FAIL;
     789             :                 }
     790        4484 :                 id++;
     791             :         }
     792             :         /* second the var sized types */
     793         195 :         id=-127; /* start after nil */
     794        6237 :         for (int i=0;i<GDKatomcnt; i++) {
     795        6042 :                 if (!ATOMvarsized(i))
     796        4484 :                         continue;
     797        3116 :                 if (BUNappend(lg->type_id, &id, false) != GDK_SUCCEED ||
     798        3116 :                     BUNappend(lg->type_nme, BATatoms[i].name, false) != GDK_SUCCEED ||
     799        3116 :                     BUNappend(lg->type_nr, &i, false) != GDK_SUCCEED ||
     800        1558 :                     fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0) {
     801           0 :                         return GDK_FAIL;
     802             :                 }
     803        1558 :                 id++;
     804             :         }
     805         195 :         return GDK_SUCCEED;
     806             : }
     807             : 
     808             : #define TR_SIZE         1024
     809             : 
     810             : static trans *
     811       10907 : tr_create(trans *tr, int tid)
     812             : {
     813       10907 :         trans *ntr = GDKmalloc(sizeof(trans));
     814             : 
     815       10907 :         if (ntr == NULL)
     816             :                 return NULL;
     817       10907 :         ntr->tid = tid;
     818       10907 :         ntr->sz = TR_SIZE;
     819       10907 :         ntr->nr = 0;
     820       10907 :         ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
     821       10907 :         if (ntr->changes == NULL) {
     822           0 :                 GDKfree(ntr);
     823           0 :                 return NULL;
     824             :         }
     825       10907 :         ntr->tr = tr;
     826       10907 :         return ntr;
     827             : }
     828             : 
     829             : static gdk_return
     830      106647 : la_apply(logger *lg, logaction *c)
     831             : {
     832             :         gdk_return ret = GDK_SUCCEED;
     833             : 
     834      106647 :         switch (c->type) {
     835       94570 :         case LOG_UPDATE_BULK:
     836             :         case LOG_UPDATE:
     837       94570 :                 ret = la_bat_updates(lg, c);
     838       94570 :                 break;
     839        3013 :         case LOG_CREATE:
     840        3013 :                 if (!lg->flushing)
     841         232 :                         ret = la_bat_create(lg, c);
     842             :                 break;
     843        9064 :         case LOG_DESTROY:
     844        9064 :                 if (!lg->flushing)
     845          36 :                         ret = la_bat_destroy(lg, c);
     846             :                 break;
     847           0 :         case LOG_CLEAR:
     848           0 :                 if (!lg->flushing)
     849           0 :                         ret = la_bat_clear(lg, c);
     850             :                 break;
     851             :         default:
     852           0 :                 assert(0);
     853             :         }
     854      106647 :         return ret;
     855             : }
     856             : 
     857             : static void
     858      106647 : la_destroy(logaction *c)
     859             : {
     860      106647 :         if (c->b && (c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK))
     861             :                 logbat_destroy(c->b);
     862      106647 :         if (c->uid && c->type == LOG_UPDATE)
     863             :                 logbat_destroy(c->uid);
     864      106647 : }
     865             : 
     866             : static gdk_return
     867      106647 : tr_grow(trans *tr)
     868             : {
     869      106647 :         if (tr->nr == tr->sz) {
     870             :                 logaction *changes;
     871           8 :                 tr->sz <<= 1;
     872           8 :                 changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
     873           8 :                 if (changes == NULL)
     874             :                         return GDK_FAIL;
     875           8 :                 tr->changes = changes;
     876             :         }
     877             :         /* cleanup the next */
     878      106647 :         tr->changes[tr->nr].b = NULL;
     879      106647 :         return GDK_SUCCEED;
     880             : }
     881             : 
     882             : static trans *
     883       10907 : tr_destroy(trans *tr)
     884             : {
     885       10907 :         trans *r = tr->tr;
     886             : 
     887       10907 :         GDKfree(tr->changes);
     888       10907 :         GDKfree(tr);
     889       10907 :         return r;
     890             : }
     891             : 
     892             : static trans *
     893           0 : tr_abort_(logger *lg, trans *tr, int s)
     894             : {
     895             :         int i;
     896             : 
     897           0 :         if (lg->debug & 1)
     898           0 :                 fprintf(stderr, "#tr_abort\n");
     899             : 
     900           0 :         for (i = s; i < tr->nr; i++)
     901           0 :                 la_destroy(&tr->changes[i]);
     902           0 :         return tr_destroy(tr);
     903             : }
     904             : 
     905             : static trans *
     906             : tr_abort(logger *lg, trans *tr)
     907             : {
     908           0 :         return tr_abort_(lg, tr, 0);
     909             : }
     910             : 
     911             : static trans *
     912       10907 : tr_commit(logger *lg, trans *tr)
     913             : {
     914             :         int i;
     915             : 
     916       10907 :         if (lg->debug & 1)
     917           0 :                 fprintf(stderr, "#tr_commit\n");
     918             : 
     919      117554 :         for (i = 0; i < tr->nr; i++) {
     920      106647 :                 if (la_apply(lg, &tr->changes[i]) != GDK_SUCCEED) {
     921             :                         do {
     922           0 :                                 tr = tr_abort_(lg, tr, i);
     923           0 :                         } while (tr != NULL);
     924             :                         return (trans *) -1;
     925             :                 }
     926      106647 :                 la_destroy(&tr->changes[i]);
     927             :         }
     928       10907 :         lg->saved_tid = tr->tid;
     929       10907 :         return tr_destroy(tr);
     930             : }
     931             : 
     932             : static gdk_return
     933          70 : logger_read_types_file(logger *lg, FILE *fp)
     934             : {
     935          70 :         int id = 0;
     936             :         char atom_name[IDLENGTH];
     937             : 
     938             :         /* scanf should use IDLENGTH somehow */
     939        2202 :         while(fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
     940        2132 :                 int i = ATOMindex(atom_name);
     941             : 
     942        2132 :                 if (id > 255 || i < 0) {
     943           0 :                         GDKerror("unknown type in log file '%s'\n", atom_name);
     944           0 :                         return GDK_FAIL;
     945             :                 }
     946        2132 :                 bte lid = (bte)id;
     947        4264 :                 if (BUNappend(lg->type_id, &lid, false) != GDK_SUCCEED ||
     948        4264 :                     BUNappend(lg->type_nme, atom_name, false) != GDK_SUCCEED ||
     949        2132 :                     BUNappend(lg->type_nr, &i, false) != GDK_SUCCEED) {
     950           0 :                         return GDK_FAIL;
     951             :                 }
     952             :         }
     953             :         return GDK_SUCCEED;
     954             : }
     955             : 
     956             : 
     957             : gdk_return
     958         195 : logger_create_types_file(logger *lg, const char *filename)
     959             : {
     960             :         FILE *fp;
     961             : 
     962         195 :         if ((fp = MT_fopen(filename, "w")) == NULL) {
     963           0 :                 GDKerror("cannot create log file %s\n", filename);
     964           0 :                 return GDK_FAIL;
     965             :         }
     966         195 :         if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
     967           0 :                 fclose(fp);
     968             :                 MT_remove(filename);
     969           0 :                 GDKerror("writing log file %s failed", filename);
     970           0 :                 return GDK_FAIL;
     971             :         }
     972             : 
     973         195 :         if (logger_write_new_types(lg, fp) != GDK_SUCCEED) {
     974           0 :                 fclose(fp);
     975             :                 MT_remove(filename);
     976           0 :                 GDKerror("writing log file %s failed", filename);
     977           0 :                 return GDK_FAIL;
     978             :         }
     979         195 :         if (fflush(fp) < 0 || (!(GDKdebug & NOSYNCMASK)
     980             : #if defined(_MSC_VER)
     981             :                      && _commit(_fileno(fp)) < 0
     982             : #elif defined(HAVE_FDATASYNC)
     983           0 :                      && fdatasync(fileno(fp)) < 0
     984             : #elif defined(HAVE_FSYNC)
     985             :                      && fsync(fileno(fp)) < 0
     986             : #endif
     987             :             )) {
     988             :                 MT_remove(filename);
     989           0 :                 GDKerror("flushing log file %s failed", filename);
     990           0 :                 return GDK_FAIL;
     991             :         }
     992         195 :         if (fclose(fp) < 0) {
     993             :                 MT_remove(filename);
     994           0 :                 GDKerror("closing log file %s failed", filename);
     995           0 :                 return GDK_FAIL;
     996             :         }
     997             :         return GDK_SUCCEED;
     998             : }
     999             : 
    1000             : static gdk_return
    1001       12049 : logger_open_output(logger *lg)
    1002             : {
    1003       12049 :         logged_range *new_range = (logged_range*)GDKmalloc(sizeof(logged_range));
    1004             : 
    1005       12049 :         if (!new_range) {
    1006           0 :                 TRC_CRITICAL(GDK, "allocation failure\n");
    1007           0 :                 return GDK_FAIL;
    1008             :         }
    1009             : 
    1010       12049 :         lg->end = 0;
    1011       12049 :         if (!LOG_DISABLED(lg)) {
    1012             :                 char id[32];
    1013             :                 char *filename;
    1014             : 
    1015       12048 :                 if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
    1016           0 :                         TRC_CRITICAL(GDK, "filename is too large\n");
    1017           0 :                         GDKfree(new_range);
    1018           0 :                         return GDK_FAIL;
    1019             :                 }
    1020       12048 :                 if (!(filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id))) {
    1021           0 :                         TRC_CRITICAL(GDK, "allocation failure\n");
    1022           0 :                         GDKfree(new_range);
    1023           0 :                         return GDK_FAIL;
    1024             :                 }
    1025             : 
    1026       12048 :                 lg->output_log = open_wstream(filename);
    1027       12048 :                 if (lg->output_log) {
    1028       12048 :                         short byteorder = 1234;
    1029       12048 :                         mnstr_write(lg->output_log, &byteorder, sizeof(byteorder), 1);
    1030             :                 }
    1031       12048 :                 lg->end = 0;
    1032             : 
    1033       12048 :                 if (lg->output_log == NULL || mnstr_errnr(lg->output_log)) {
    1034           0 :                         TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
    1035           0 :                         GDKfree(new_range);
    1036           0 :                         GDKfree(filename);
    1037           0 :                         return GDK_FAIL;
    1038             :                 }
    1039       12048 :                 GDKfree(filename);
    1040             :         }
    1041       12049 :         new_range->id = lg->id;
    1042       12049 :         new_range->first_tid = lg->tid;
    1043       12049 :         new_range->last_tid = lg->tid;
    1044       12049 :         new_range->last_ts = 0;
    1045       12049 :         new_range->next = NULL;
    1046       12049 :         if (lg->current)
    1047       11783 :                 lg->current->next = new_range;
    1048       12049 :         lg->current = new_range;
    1049       12049 :         if (!lg->pending)
    1050         266 :                 lg->pending = new_range;
    1051             :         return GDK_SUCCEED;
    1052             : }
    1053             : 
    1054             : static inline void
    1055       11564 : logger_close_input(logger *lg)
    1056             : {
    1057       11564 :         if (!lg->inmemory)
    1058       11563 :                 close_stream(lg->input_log);
    1059       11564 :         lg->input_log = NULL;
    1060       11564 : }
    1061             : 
    1062             : static inline void
    1063       12048 : logger_close_output(logger *lg)
    1064             : {
    1065       12048 :         if (!LOG_DISABLED(lg))
    1066       12047 :                 close_stream(lg->output_log);
    1067       12048 :         lg->output_log = NULL;
    1068       12048 : }
    1069             : 
    1070             : static gdk_return
    1071       11299 : logger_open_input(logger *lg, char *filename, bool *filemissing)
    1072             : {
    1073       11299 :         lg->input_log = open_rstream(filename);
    1074             : 
    1075             :         /* if the file doesn't exist, there is nothing to be read back */
    1076       11299 :         if (lg->input_log == NULL || mnstr_errnr(lg->input_log)) {
    1077          78 :                 logger_close_input(lg);
    1078          78 :                 *filemissing = true;
    1079          78 :                 return GDK_SUCCEED;
    1080             :         }
    1081             :         short byteorder;
    1082       11221 :         switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
    1083           0 :         case -1:
    1084           0 :                 logger_close_input(lg);
    1085           0 :                 return GDK_FAIL;
    1086           5 :         case 0:
    1087             :                 /* empty file is ok */
    1088           5 :                 logger_close_input(lg);
    1089           5 :                 return GDK_SUCCEED;
    1090       11216 :         case 1:
    1091             :                 /* if not empty, must start with correct byte order mark */
    1092       11216 :                 if (byteorder != 1234) {
    1093           0 :                         TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
    1094           0 :                         logger_close_input(lg);
    1095           0 :                         return GDK_FAIL;
    1096             :                 }
    1097             :                 break;
    1098             :         }
    1099             :         return GDK_SUCCEED;
    1100             : }
    1101             : 
    1102             : static log_return
    1103       11216 : logger_read_transaction(logger *lg)
    1104             : {
    1105             :         logformat l;
    1106             :         trans *tr = NULL;
    1107             :         log_return err = LOG_OK;
    1108             :         int ok = 1;
    1109       11216 :         int dbg = GDKdebug;
    1110             : 
    1111       11216 :         if (!lg->flushing)
    1112          92 :                 GDKdebug &= ~(CHECKMASK|PROPMASK);
    1113             : 
    1114      142196 :         while (err == LOG_OK && (ok=log_read_format(lg, &l))) {
    1115      130980 :                 if (l.flag == 0 && l.id == 0) {
    1116             :                         err = LOG_EOF;
    1117             :                         break;
    1118             :                 }
    1119             : 
    1120      130980 :                 if (lg->debug & 1) {
    1121           0 :                         fprintf(stderr, "#logger_readlog: ");
    1122           0 :                         if (l.flag > 0 &&
    1123             :                             l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
    1124           0 :                                 fprintf(stderr, "%s", log_commands[(int) l.flag]);
    1125             :                         else
    1126           0 :                                 fprintf(stderr, "%d", l.flag);
    1127           0 :                         fprintf(stderr, " %d\n", l.id);
    1128             :                 }
    1129             :                 /* the functions we call here can succeed (LOG_OK),
    1130             :                  * but they can also fail for two different reasons:
    1131             :                  * they can run out of input (LOG_EOF -- this is not
    1132             :                  * serious, we just abort the remaining transactions),
    1133             :                  * or some malloc or BAT update fails (LOG_ERR -- this
    1134             :                  * is serious, we must abort the complete process);
    1135             :                  * the latter failure causes the current function to
    1136             :                  * return GDK_FAIL */
    1137      130980 :                 switch (l.flag) {
    1138       10907 :                 case LOG_START:
    1139       10907 :                         if (l.id > lg->tid)
    1140          65 :                                 lg->tid = l.id;
    1141       10907 :                         if ((tr = tr_create(tr, l.id)) == NULL) {
    1142             :                                 err = LOG_ERR;
    1143             :                                 break;
    1144             :                         }
    1145       10907 :                         if (lg->debug & 1)
    1146           0 :                                 fprintf(stderr, "#logger tstart %d\n", tr->tid);
    1147             :                         break;
    1148       10907 :                 case LOG_END:
    1149       10907 :                         if (tr == NULL)
    1150             :                                 err = LOG_EOF;
    1151       10907 :                         else if (tr->tid != l.id)    /* abort record */
    1152             :                                 tr = tr_abort(lg, tr);
    1153             :                         else
    1154       10907 :                                 tr = tr_commit(lg, tr);
    1155             :                         break;
    1156        2519 :                 case LOG_SEQ:
    1157        2519 :                         err = log_read_seq(lg, &l);
    1158        2519 :                         break;
    1159       94570 :                 case LOG_UPDATE_CONST:
    1160             :                 case LOG_UPDATE_BULK:
    1161             :                 case LOG_UPDATE:
    1162       94570 :                         if (tr == NULL)
    1163             :                                 err = LOG_EOF;
    1164             :                         else
    1165       94570 :                                 err = log_read_updates(lg, tr, &l, l.id, 0);
    1166             :                         break;
    1167        3013 :                 case LOG_CREATE:
    1168        3013 :                         if (tr == NULL)
    1169             :                                 err = LOG_EOF;
    1170             :                         else
    1171        3013 :                                 err = log_read_create(lg, tr, l.id);
    1172             :                         break;
    1173        9064 :                 case LOG_DESTROY:
    1174        9064 :                         if (tr == NULL)
    1175             :                                 err = LOG_EOF;
    1176             :                         else
    1177        9064 :                                 err = log_read_destroy(lg, tr, l.id);
    1178             :                         break;
    1179           0 :                 case LOG_CLEAR:
    1180           0 :                         if (tr == NULL)
    1181             :                                 err = LOG_EOF;
    1182             :                         else
    1183           0 :                                 err = log_read_clear(lg, tr, l.id);
    1184             :                         break;
    1185             :                 default:
    1186             :                         err = LOG_ERR;
    1187             :                 }
    1188      130980 :                 if (tr == (trans *) -1) {
    1189             :                         err = LOG_ERR;
    1190             :                         tr = NULL;
    1191             :                         break;
    1192             :                 }
    1193             :         }
    1194       11216 :         while (tr)
    1195             :                 tr = tr_abort(lg, tr);
    1196       11216 :         if (!lg->flushing)
    1197          92 :                 GDKdebug = dbg;
    1198       11216 :         if (!ok)
    1199       11216 :                 return LOG_EOF;
    1200             :         return err;
    1201             : }
    1202             : 
    1203             : static gdk_return
    1204         175 : logger_readlog(logger *lg, char *filename, bool *filemissing)
    1205             : {
    1206             :         log_return err = LOG_OK;
    1207             :         time_t t0, t1;
    1208             :         struct stat sb;
    1209             : 
    1210         175 :         assert(!lg->inmemory);
    1211             : 
    1212         175 :         if (lg->debug & 1) {
    1213           0 :                 fprintf(stderr, "#logger_readlog opening %s\n", filename);
    1214             :         }
    1215             : 
    1216         175 :         gdk_return res = logger_open_input(lg, filename, filemissing);
    1217         175 :         if (!lg->input_log)
    1218             :                 return res;
    1219             :         int fd;
    1220         184 :         if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
    1221           0 :                 if (lg->debug & 1) {
    1222           0 :                         fprintf(stderr, "!ERROR: logger_readlog: fstat on opened file %s failed\n", filename);
    1223             :                 }
    1224           0 :                 logger_close_input(lg);
    1225             :                 /* If the file could be opened, but fstat fails,
    1226             :                  * something weird is going on */
    1227           0 :                 return GDK_FAIL;
    1228             :         }
    1229          92 :         t0 = time(NULL);
    1230          92 :         if (lg->debug & 1) {
    1231           0 :                 printf("# Start reading the write-ahead log '%s'\n", filename);
    1232           0 :                 fflush(stdout);
    1233             :         }
    1234         184 :         while (err != LOG_EOF && err != LOG_ERR) {
    1235          92 :                 t1 = time(NULL);
    1236          92 :                 if (t1 - t0 > 10) {
    1237             :                         lng fpos;
    1238             :                         t0 = t1;
    1239             :                         /* not more than once every 10 seconds */
    1240           0 :                         fpos = (lng) getfilepos(getFile(lg->input_log));
    1241           0 :                         if (lg->debug & 1 && fpos >= 0) {
    1242           0 :                                 printf("# still reading write-ahead log \"%s\" (%d%% done)\n", filename, (int) ((fpos * 100 + 50) / sb.st_size));
    1243           0 :                                 fflush(stdout);
    1244             :                         }
    1245             :                 }
    1246          92 :                 err = logger_read_transaction(lg);
    1247             :         }
    1248          92 :         logger_close_input(lg);
    1249          92 :         lg->input_log = NULL;
    1250             : 
    1251             :         /* remaining transactions are not committed, ie abort */
    1252          92 :         if (lg->debug & 1) {
    1253           0 :                 printf("# Finished reading the write-ahead log '%s'\n", filename);
    1254           0 :                 fflush(stdout);
    1255             :         }
    1256             :         /* we cannot distinguish errors from incomplete transactions
    1257             :          * (even if we would log aborts in the logs). So we simply
    1258             :          * abort and move to the next log file */
    1259             :         //return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    1260             :         return GDK_SUCCEED;
    1261             : }
    1262             : 
    1263             : /*
    1264             :  * The log files are incrementally numbered, starting from 2. They are
    1265             :  * processed in the same sequence.
    1266             :  */
    1267             : static gdk_return
    1268          78 : logger_readlogs(logger *lg, char *filename)
    1269             : {
    1270             :         gdk_return res = GDK_SUCCEED;
    1271             : 
    1272          78 :         assert(!lg->inmemory);
    1273          78 :         if (lg->debug & 1)
    1274           0 :                 fprintf(stderr, "#logger_readlogs logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
    1275             : 
    1276             :         char log_filename[FILENAME_MAX];
    1277          78 :         if (lg->saved_id >= lg->id) {
    1278          78 :                 bool filemissing = false;
    1279             : 
    1280          78 :                 lg->id = lg->saved_id+1;
    1281         253 :                 while (res == GDK_SUCCEED && !filemissing) {
    1282         175 :                         if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
    1283           0 :                                 GDKerror("Logger filename path is too large\n");
    1284           0 :                                 return GDK_FAIL;
    1285             :                         }
    1286         175 :                         res = logger_readlog(lg, log_filename, &filemissing);
    1287         175 :                         if (!filemissing) {
    1288          97 :                                 lg->saved_id++;
    1289          97 :                                 lg->id++;
    1290             :                         }
    1291             :                 }
    1292             :         }
    1293             :         return res;
    1294             : }
    1295             : 
    1296             : static gdk_return
    1297       15317 : logger_commit(logger *lg)
    1298             : {
    1299       15317 :         if (lg->debug & 1)
    1300           0 :                 fprintf(stderr, "#logger_commit\n");
    1301             : 
    1302       15317 :         return bm_commit(lg);
    1303             : }
    1304             : 
    1305             : static gdk_return
    1306          78 : check_version(logger *lg, FILE *fp, const char *fn, const char *logdir, const char *filename)
    1307             : {
    1308          78 :         int version = 0;
    1309             : 
    1310          78 :         assert(!lg->inmemory);
    1311          78 :         if (fscanf(fp, "%6d", &version) != 1) {
    1312           0 :                 GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
    1313           0 :                 fclose(fp);
    1314           0 :                 return GDK_FAIL;
    1315             :         }
    1316          78 :         if (version < 52300) {       /* first CATALOG_VERSION for "new" log format */
    1317           8 :                 lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1318           8 :                 lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1319           8 :                 lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
    1320           8 :                 if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
    1321           0 :                         GDKerror("cannot create catalog bats");
    1322           0 :                         fclose(fp);
    1323           0 :                         return GDK_FAIL;
    1324             :                 }
    1325             :                 /* old_logger_load always closes fp */
    1326           8 :                 if (old_logger_load(lg, fn, logdir, fp, version, filename) != GDK_SUCCEED) {
    1327             :                         //loads drop no longer needed catalog, snapshots bats
    1328             :                         //convert catalog_oid -> catalog_id (lng->int)
    1329           0 :                         GDKerror("Incompatible database version %06d, "
    1330             :                                  "this server supports version %06d.\n%s",
    1331             :                                  version, lg->version,
    1332             :                                  version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
    1333           0 :                         return GDK_FAIL;
    1334             :                 }
    1335             :                 return GDK_SUCCEED;
    1336          70 :         } else if (version != lg->version) {
    1337           0 :                 if (lg->prefuncp == NULL ||
    1338           0 :                     (*lg->prefuncp)(lg->funcdata, version, lg->version) != GDK_SUCCEED) {
    1339           0 :                         GDKerror("Incompatible database version %06d, "
    1340             :                                  "this server supports version %06d.\n%s",
    1341             :                                  version, lg->version,
    1342             :                                  version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
    1343           0 :                         fclose(fp);
    1344           0 :                         return GDK_FAIL;
    1345             :                 }
    1346             :         } else {
    1347          70 :                 lg->postfuncp = NULL;         /* don't call */
    1348             :         }
    1349         140 :         if (fgetc(fp) != '\n' ||         /* skip \n */
    1350          70 :             fgetc(fp) != '\n') {         /* skip \n */
    1351           0 :                 GDKerror("Badly formatted log file");
    1352           0 :                 fclose(fp);
    1353           0 :                 return GDK_FAIL;
    1354             :         }
    1355          70 :         if (logger_read_types_file(lg, fp) != GDK_SUCCEED) {
    1356           0 :                 fclose(fp);
    1357           0 :                 return GDK_FAIL;
    1358             :         }
    1359          70 :         fclose(fp);
    1360          70 :         return GDK_SUCCEED;
    1361             : }
    1362             : 
    1363             : static BAT *
    1364        2221 : bm_tids(BAT *b, BAT *d)
    1365             : {
    1366        2221 :         BUN sz = BATcount(b);
    1367        2221 :         BAT *tids = BATdense(0, 0, sz);
    1368             : 
    1369        2221 :         if (tids == NULL)
    1370             :                 return NULL;
    1371             : 
    1372        2221 :         if (BATcount(d)) {
    1373        2221 :                 BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
    1374             :                 logbat_destroy(tids);
    1375             :                 tids = diff;
    1376             :         }
    1377             :         return tids;
    1378             : }
    1379             : 
    1380             : 
    1381             : static gdk_return
    1382       11441 : logger_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
    1383             : {
    1384             :         char bak[IDLENGTH];
    1385             : 
    1386       11441 :         if (BATmode(old, true) != GDK_SUCCEED) {
    1387           0 :                 GDKerror("cannot convert old %s to transient", name);
    1388           0 :                 return GDK_FAIL;
    1389             :         }
    1390       11441 :         if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
    1391           0 :                 GDKerror("name %s_%s too long\n", fn, name);
    1392           0 :                 return GDK_FAIL;
    1393             :         }
    1394       22882 :         if (BBPrename(old->batCacheid, NULL) != 0 ||
    1395       11441 :             BBPrename(new->batCacheid, bak) != 0) {
    1396           0 :                 GDKerror("rename (%s) failed\n", bak);
    1397           0 :                 return GDK_FAIL;
    1398             :         }
    1399       11441 :         BBPretain(new->batCacheid);
    1400       11441 :         return GDK_SUCCEED;
    1401             : }
    1402             : 
    1403             : static gdk_return
    1404         266 : bm_get_counts(logger *lg)
    1405             : {
    1406             :         BUN p, q, deleted = 0;
    1407         266 :         const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    1408             : 
    1409       20376 :         BATloop(lg->catalog_bid, p, q) {
    1410       20110 :                 oid pos = p;
    1411       20110 :                 lng cnt = 0;
    1412       20110 :                 lng lid = lng_nil;
    1413             : 
    1414       20110 :                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
    1415       20110 :                         BAT *b = BBPquickdesc(bids[p]);
    1416       20110 :                         cnt = BATcount(b);
    1417             :                 } else {
    1418           0 :                         deleted++;
    1419           0 :                         lid = 1;
    1420             :                 }
    1421       20110 :                 if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
    1422           0 :                         return GDK_FAIL;
    1423       20110 :                 if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
    1424             :                         return GDK_FAIL;
    1425             :         }
    1426         266 :         lg->deleted = deleted;
    1427         266 :         lg->cnt = BATcount(lg->catalog_bid);
    1428         266 :         return GDK_SUCCEED;
    1429             : }
    1430             : 
    1431             : static int
    1432       11441 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
    1433             : {
    1434     3325800 :         for(int i=0; i<next; i++) {
    1435     3314359 :                 if (n[i] == bid) {
    1436           0 :                         sizes[i] = sz;
    1437           0 :                         return next;
    1438             :                 }
    1439             :         }
    1440       11441 :         n[next] = bid;
    1441       11441 :         sizes[next++] = sz;
    1442       11441 :         return next;
    1443             : }
    1444             : 
    1445             : static int
    1446        2333 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts, BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, int cleanup)
    1447             : {
    1448             :         BAT *nbids, *noids, *ncnts, *nlids, *ndels;
    1449             :         BUN p, q;
    1450             :         int err = 0, rcnt = 0;
    1451             : 
    1452        2333 :         oid *poss = Tloc(dcatalog, 0);
    1453       45883 :         BATloop(dcatalog, p, q) {
    1454       43550 :                 oid pos = poss[p];
    1455             : 
    1456       43550 :                 if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
    1457       25571 :                         continue;
    1458             : 
    1459       17979 :                 if (lids[pos] >= 0) {
    1460       17979 :                         lids[pos] = -1; /* mark as transient */
    1461       17979 :                         r[rcnt++] = bids[pos];
    1462             : 
    1463             :                         BAT *lb;
    1464             : 
    1465       35958 :                         if ((lb = BATdescriptor(bids[pos])) == NULL ||
    1466       17979 :                                 BATmode(lb, true/*transient*/) != GDK_SUCCEED) {
    1467           0 :                                 TRC_WARNING(GDK, "Failed to set bat(%d) transient\n", bids[pos]);
    1468             :                         }
    1469             :                         logbat_destroy(lb);
    1470             :                 }
    1471             :         }
    1472        2333 :         BUN ocnt = BATcount(catalog_bid);
    1473        2333 :         nbids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT);
    1474        2333 :         noids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT);
    1475        2333 :         ncnts = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT);
    1476        2333 :         nlids = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT);
    1477        2333 :         ndels = logbat_new(TYPE_oid, BATcount(dcatalog)-cleanup, PERSISTENT);
    1478             : 
    1479        2333 :         if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
    1480             :                 logbat_destroy(nbids);
    1481             :                 logbat_destroy(noids);
    1482             :                 logbat_destroy(ncnts);
    1483             :                 logbat_destroy(nlids);
    1484             :                 logbat_destroy(ndels);
    1485           0 :                 return 0;
    1486             :         }
    1487             : 
    1488        2333 :         int *oids = (int*)Tloc(catalog_id, 0);
    1489        2333 :         q = BUNlast(catalog_bid);
    1490      641054 :         for(p = 0; p<q && !err; p++) {
    1491      638721 :                 bat col = bids[p];
    1492      638721 :                 int nid = oids[p];
    1493      638721 :                 lng lid = lids[p];
    1494      638721 :                 lng cnt = cnts[p];
    1495      638721 :                 oid pos = p;
    1496             : 
    1497             :                 /* only project out the deleted with lid == -1
    1498             :                  * update dcatalog */
    1499      638721 :                 if (lid == -1)
    1500       17979 :                         continue; /* remove */
    1501             : 
    1502     1241484 :                 if (BUNappend(nbids, &col, false) != GDK_SUCCEED ||
    1503     1241484 :                     BUNappend(noids, &nid, false) != GDK_SUCCEED ||
    1504     1241484 :                     BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
    1505      620742 :                     BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
    1506             :                         err=1;
    1507      620742 :                 pos = (oid)(BATcount(nbids)-1);
    1508      620742 :                 if (lid != lng_nil && BUNappend(ndels, &pos, false) != GDK_SUCCEED)
    1509             :                         err=1;
    1510             :         }
    1511             : 
    1512        2333 :         if (err) {
    1513             :                 logbat_destroy(nbids);
    1514             :                 logbat_destroy(noids);
    1515             :                 logbat_destroy(ndels);
    1516             :                 logbat_destroy(ncnts);
    1517             :                 logbat_destroy(nlids);
    1518           0 :                 return 0;
    1519             :         }
    1520             :         /* point of no return */
    1521        4666 :         if (logger_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
    1522        4666 :             logger_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
    1523        2333 :             logger_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
    1524             :                 logbat_destroy(nbids);
    1525             :                 logbat_destroy(noids);
    1526             :                 logbat_destroy(ndels);
    1527             :                 logbat_destroy(ncnts);
    1528             :                 logbat_destroy(nlids);
    1529           0 :                 return -1;
    1530             :         }
    1531        2333 :         r[rcnt++] = lg->catalog_bid->batCacheid;
    1532        2333 :         r[rcnt++] = lg->catalog_id->batCacheid;
    1533        2333 :         r[rcnt++] = lg->dcatalog->batCacheid;
    1534             : 
    1535             :         logbat_destroy(lg->catalog_bid);
    1536        2333 :         logbat_destroy(lg->catalog_id);
    1537        2333 :         logbat_destroy(lg->dcatalog);
    1538             : 
    1539        2333 :         lg->catalog_bid = nbids;
    1540        2333 :         lg->catalog_id = noids;
    1541        2333 :         lg->dcatalog = ndels;
    1542             : 
    1543        2333 :         BBPunfix(lg->catalog_cnt->batCacheid);
    1544        2333 :         BBPunfix(lg->catalog_lid->batCacheid);
    1545             : 
    1546        2333 :         lg->catalog_cnt = ncnts;
    1547        2333 :         lg->catalog_lid = nlids;
    1548        2333 :         lg->cnt = BATcount(lg->catalog_bid);
    1549        2333 :         lg->deleted -= cleanup;
    1550        2333 :         assert(lg->deleted == BATcount(lg->dcatalog));
    1551             :         return rcnt;
    1552             : }
    1553             : 
    1554             : static gdk_return
    1555       15693 : bm_subcommit(logger *lg)
    1556             : {
    1557             :         BUN p, q;
    1558             :         logger_lock(lg);
    1559       15693 :         BAT *catalog_bid = lg->catalog_bid;
    1560       15693 :         BAT *catalog_id = lg->catalog_id;
    1561       15693 :         BAT *dcatalog = lg->dcatalog;
    1562       15693 :         BUN nn = 13 + BATcount(catalog_bid);
    1563       15693 :         bat *n = GDKmalloc(sizeof(bat) * nn);
    1564       15693 :         bat *r = GDKmalloc(sizeof(bat) * nn);
    1565       15693 :         BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
    1566             :         int i = 0, rcnt = 0;
    1567             :         gdk_return res;
    1568             :         const log_bid *bids;
    1569             :         lng *cnts = NULL, *lids = NULL;
    1570             :         int cleanup = 0;
    1571             :         lng t0 = 0;
    1572             : 
    1573       15693 :         if (n == NULL || r == NULL || sizes == NULL) {
    1574           0 :                 GDKfree(n);
    1575           0 :                 GDKfree(r);
    1576           0 :                 GDKfree(sizes);
    1577             :                 logger_unlock(lg);
    1578           0 :                 return GDK_FAIL;
    1579             :         }
    1580             : 
    1581       15693 :         sizes[i] = 0;
    1582       15693 :         n[i++] = 0;             /* n[0] is not used */
    1583       15693 :         bids = (const log_bid *) Tloc(catalog_bid, 0);
    1584       15693 :         if (lg->catalog_cnt)
    1585       15505 :                 cnts = (lng *) Tloc(lg->catalog_cnt, 0);
    1586       15693 :         if (lg->catalog_lid)
    1587       15505 :                 lids = (lng *) Tloc(lg->catalog_lid, 0);
    1588     4329744 :         BATloop(catalog_bid, p, q) {
    1589     4314051 :                 bat col = bids[p];
    1590             : 
    1591     4314051 :                 if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid)
    1592       17979 :                         cleanup++;
    1593     4314051 :                 if (lg->debug & 1)
    1594           0 :                         fprintf(stderr, "#commit new %s (%d)\n", BBP_logical(col), col);
    1595     4314051 :                 assert(col);
    1596     4314051 :                 sizes[i] = cnts?(BUN)cnts[p]:0;
    1597     4314051 :                 n[i++] = col;
    1598             :         }
    1599             :         /* now commit catalog, so it's also up to date on disk */
    1600       15693 :         sizes[i] = lg->cnt;
    1601       15693 :         n[i++] = catalog_bid->batCacheid;
    1602       15693 :         sizes[i] = lg->cnt;
    1603       15693 :         n[i++] = catalog_id->batCacheid;
    1604       15693 :         sizes[i] = BATcount(dcatalog);
    1605       15693 :         n[i++] = dcatalog->batCacheid;
    1606             : 
    1607       15693 :         if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, catalog_bid, catalog_id, dcatalog, cleanup)) < 0) {
    1608           0 :                 GDKfree(n);
    1609           0 :                 GDKfree(r);
    1610           0 :                 GDKfree(sizes);
    1611             :                 logger_unlock(lg);
    1612           0 :                 return GDK_FAIL;
    1613             :         }
    1614       15693 :         if (dcatalog != lg->dcatalog) {
    1615        2333 :                 i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, BATcount(lg->catalog_bid));
    1616        2333 :                 i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, BATcount(lg->catalog_bid));
    1617        2333 :                 i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
    1618             :         }
    1619       15693 :         if (lg->seqs_id) {
    1620       15505 :                 sizes[i] = BATcount(lg->seqs_id);
    1621       15505 :                 n[i++] = lg->seqs_id->batCacheid;
    1622       15505 :                 sizes[i] = BATcount(lg->seqs_id);
    1623       15505 :                 n[i++] = lg->seqs_val->batCacheid;
    1624             :         }
    1625       15693 :         if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id)/2)) {
    1626             :                 BAT *tids, *ids, *vals;
    1627             : 
    1628        2221 :                 tids = bm_tids(lg->seqs_id, lg->dseqs);
    1629        2221 :                 if (tids == NULL) {
    1630           0 :                         GDKfree(n);
    1631           0 :                         GDKfree(r);
    1632           0 :                         GDKfree(sizes);
    1633             :                         logger_unlock(lg);
    1634           0 :                         return GDK_FAIL;
    1635             :                 }
    1636        2221 :                 ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
    1637        2221 :                 vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
    1638             : 
    1639        2221 :                 if (ids == NULL || vals == NULL) {
    1640             :                         logbat_destroy(tids);
    1641             :                         logbat_destroy(ids);
    1642             :                         logbat_destroy(vals);
    1643           0 :                         GDKfree(n);
    1644           0 :                         GDKfree(r);
    1645           0 :                         GDKfree(sizes);
    1646             :                         logger_unlock(lg);
    1647           0 :                         return GDK_FAIL;
    1648             :                 }
    1649             : 
    1650        4442 :                 if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
    1651        2221 :                     BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
    1652             :                         logbat_destroy(tids);
    1653             :                         logbat_destroy(ids);
    1654             :                         logbat_destroy(vals);
    1655           0 :                         GDKfree(n);
    1656           0 :                         GDKfree(r);
    1657           0 :                         GDKfree(sizes);
    1658             :                         logger_unlock(lg);
    1659           0 :                         return GDK_FAIL;
    1660             :                 }
    1661             :                 logbat_destroy(tids);
    1662        2221 :                 BATclear(lg->dseqs, true);
    1663             : 
    1664        4442 :                 if (logger_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
    1665        2221 :                     logger_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
    1666             :                         logbat_destroy(ids);
    1667             :                         logbat_destroy(vals);
    1668           0 :                         GDKfree(n);
    1669           0 :                         GDKfree(r);
    1670           0 :                         GDKfree(sizes);
    1671             :                         logger_unlock(lg);
    1672           0 :                         return GDK_FAIL;
    1673             :                 }
    1674        2221 :                 i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
    1675        2221 :                 i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
    1676             : 
    1677        2221 :                 r[rcnt++] = lg->seqs_id->batCacheid;
    1678        2221 :                 r[rcnt++] = lg->seqs_val->batCacheid;
    1679             : 
    1680             :                 logbat_destroy(lg->seqs_id);
    1681        2221 :                 logbat_destroy(lg->seqs_val);
    1682             : 
    1683        2221 :                 lg->seqs_id = ids;
    1684        2221 :                 lg->seqs_val = vals;
    1685             :         }
    1686       15693 :         if (lg->seqs_id) {
    1687       15505 :                 sizes[i] = BATcount(lg->dseqs);
    1688       15505 :                 n[i++] = lg->dseqs->batCacheid;
    1689             :         }
    1690             : 
    1691       15693 :         assert((BUN) i <= nn);
    1692             :         logger_unlock(lg);
    1693       15693 :         if (lg->debug & 1)
    1694           0 :                 t0 = GDKusec();
    1695       15881 :         res = TMsubcommit_list(n, cnts?sizes:NULL, i, lg->saved_id, lg->saved_tid);
    1696       15693 :         if (lg->debug & 1)
    1697           0 :                 fprintf(stderr, "#subcommit " LLFMT "usec\n", GDKusec() - t0);
    1698       15693 :         if (res == GDK_SUCCEED) { /* now cleanup */
    1699       45113 :                 for(i=0;i<rcnt; i++) {
    1700       29420 :                         if (lg->debug & 1) {
    1701           0 :                                 fprintf(stderr, "release %d\n", r[i]);
    1702           0 :                                 if (BBP_lrefs(r[i]) != 2)
    1703           0 :                                         fprintf(stderr, "release %d %d\n", r[i], BBP_lrefs(r[i]));
    1704             :                         }
    1705       29420 :                         BBPrelease(r[i]);
    1706             :                 }
    1707             :         }
    1708       15693 :         GDKfree(n);
    1709       15693 :         GDKfree(r);
    1710       15693 :         GDKfree(sizes);
    1711       15693 :         if (res != GDK_SUCCEED)
    1712           0 :                 TRC_CRITICAL(GDK, "commit failed\n");
    1713             :         return res;
    1714             : }
    1715             : 
    1716             : static gdk_return
    1717         265 : logger_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
    1718             : {
    1719             :         str filenamestr = NULL;
    1720             : 
    1721         265 :         if ((filenamestr = GDKfilepath(0, lg->dir, LOGFILE, NULL)) == NULL)
    1722             :                 return GDK_FAIL;
    1723         265 :         size_t len = strcpy_len(filename, filenamestr, FILENAME_MAX);
    1724         265 :         GDKfree(filenamestr);
    1725         265 :         if (len >= FILENAME_MAX) {
    1726           0 :                 GDKerror("Logger filename path is too large\n");
    1727           0 :                 return GDK_FAIL;
    1728             :         }
    1729         265 :         if (bak) {
    1730         265 :                 len = strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL);
    1731         265 :                 if (len >= FILENAME_MAX) {
    1732           0 :                         GDKerror("Logger filename path is too large\n");
    1733           0 :                         return GDK_FAIL;
    1734             :                 }
    1735             :         }
    1736             :         return GDK_SUCCEED;
    1737             : }
    1738             : 
    1739             : static gdk_return
    1740       11221 : logger_cleanup(logger *lg, lng id)
    1741             : {
    1742             :         char log_id[FILENAME_MAX];
    1743             : 
    1744       11221 :         if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
    1745           0 :                 GDKerror("log_id filename is too large\n");
    1746           0 :                 return GDK_FAIL;
    1747             :         }
    1748       11221 :         if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
    1749           0 :                 TRC_WARNING(GDK, "#logger_cleanup: failed to remove old WAL %s.%s\n", LOGFILE, log_id);
    1750           0 :                 GDKclrerr();
    1751             :         }
    1752             :         return GDK_SUCCEED;
    1753             : }
    1754             : 
    1755             : /* Load data from the logger logdir
    1756             :  * Initialize new directories and catalog files if none are present,
    1757             :  * unless running in read-only mode
    1758             :  * Load data and persist it in the BATs */
    1759             : static gdk_return
    1760         266 : logger_load(int debug, const char *fn, const char *logdir, logger *lg, char filename[FILENAME_MAX])
    1761             : {
    1762             :         FILE *fp = NULL;
    1763             :         char bak[FILENAME_MAX];
    1764             :         bat catalog_bid, catalog_id, dcatalog;
    1765             :         bool needcommit = false;
    1766         266 :         int dbg = GDKdebug;
    1767             :         bool readlogs = false;
    1768             : 
    1769             :         /* refactor */
    1770         266 :         if (!LOG_DISABLED(lg)) {
    1771         265 :                 if (logger_filename(lg, bak, filename) != GDK_SUCCEED)
    1772           0 :                         goto error;
    1773             :         }
    1774             : 
    1775         266 :         lg->catalog_bid = NULL;
    1776         266 :         lg->catalog_id = NULL;
    1777         266 :         lg->catalog_cnt = NULL;
    1778         266 :         lg->catalog_lid = NULL;
    1779         266 :         lg->dcatalog = NULL;
    1780             : 
    1781         266 :         lg->seqs_id = NULL;
    1782         266 :         lg->seqs_val = NULL;
    1783         266 :         lg->dseqs = NULL;
    1784             : 
    1785         266 :         lg->type_id = NULL;
    1786         266 :         lg->type_nme = NULL;
    1787         266 :         lg->type_nr = NULL;
    1788             : 
    1789         266 :         if (!LOG_DISABLED(lg)) {
    1790             :                 /* try to open logfile backup, or failing that, the file
    1791             :                  * itself. we need to know whether this file exists when
    1792             :                  * checking the database consistency later on */
    1793         265 :                 if ((fp = MT_fopen(bak, "r")) != NULL) {
    1794           0 :                         fclose(fp);
    1795             :                         fp = NULL;
    1796           0 :                         if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
    1797           0 :                             GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
    1798           0 :                                 goto error;
    1799         265 :                 } else if (errno != ENOENT) {
    1800           0 :                         GDKsyserror("open %s failed", bak);
    1801           0 :                         goto error;
    1802             :                 }
    1803             :                 fp = MT_fopen(filename, "r");
    1804         265 :                 if (fp == NULL && errno != ENOENT) {
    1805           0 :                         GDKsyserror("open %s failed", filename);
    1806           0 :                         goto error;
    1807             :                 }
    1808             :         }
    1809             : 
    1810         266 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    1811         266 :         catalog_bid = BBPindex(bak);
    1812             : 
    1813             :         /* create transient bats for type mapping, to be read from disk */
    1814         266 :         lg->type_id = logbat_new(TYPE_bte, BATSIZE, TRANSIENT);
    1815         266 :         lg->type_nme = logbat_new(TYPE_str, BATSIZE, TRANSIENT);
    1816         266 :         lg->type_nr = logbat_new(TYPE_int, BATSIZE, TRANSIENT);
    1817             : 
    1818         266 :         if (lg->type_id == NULL || lg->type_nme == NULL || lg->type_nr == NULL) {
    1819           0 :                 if (fp)
    1820           0 :                         fclose(fp);
    1821             :                 fp = NULL;
    1822           0 :                 GDKerror("cannot create type bats");
    1823           0 :                 goto error;
    1824             :         }
    1825             : 
    1826             :         /* this is intentional - if catalog_bid is 0, force it to find
    1827             :          * the persistent catalog */
    1828         266 :         if (catalog_bid == 0) {
    1829             :                 /* catalog does not exist, so the log file also
    1830             :                  * shouldn't exist */
    1831         188 :                 if (fp != NULL) {
    1832           0 :                         GDKerror("there is no logger catalog, "
    1833             :                                  "but there is a log file.\n");
    1834           0 :                         goto error;
    1835             :                 }
    1836             : 
    1837         188 :                 lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1838         188 :                 lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1839         188 :                 lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
    1840             : 
    1841         188 :                 if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
    1842           0 :                         GDKerror("cannot create catalog bats");
    1843           0 :                         goto error;
    1844             :                 }
    1845         188 :                 if (debug & 1)
    1846           0 :                         fprintf(stderr, "#create %s catalog\n", fn);
    1847             : 
    1848             :                 /* give the catalog bats names so we can find them
    1849             :                  * next time */
    1850         188 :                 strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    1851         188 :                 if (BBPrename(lg->catalog_bid->batCacheid, bak) < 0) {
    1852           0 :                         goto error;
    1853             :                 }
    1854             : 
    1855         188 :                 strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
    1856         188 :                 if (BBPrename(lg->catalog_id->batCacheid, bak) < 0) {
    1857           0 :                         goto error;
    1858             :                 }
    1859             : 
    1860         188 :                 strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    1861         188 :                 if (BBPrename(lg->dcatalog->batCacheid, bak) < 0) {
    1862           0 :                         goto error;
    1863             :                 }
    1864             : 
    1865         188 :                 if (!LOG_DISABLED(lg)) {
    1866         187 :                         if (GDKcreatedir(filename) != GDK_SUCCEED) {
    1867           0 :                                 GDKerror("cannot create directory for log file %s\n", filename);
    1868           0 :                                 goto error;
    1869             :                         }
    1870         187 :                         if (logger_create_types_file(lg, filename) != GDK_SUCCEED)
    1871           0 :                                 goto error;
    1872             :                 }
    1873             : 
    1874         188 :                 BBPretain(lg->catalog_bid->batCacheid);
    1875         188 :                 BBPretain(lg->catalog_id->batCacheid);
    1876         188 :                 BBPretain(lg->dcatalog->batCacheid);
    1877             : 
    1878         188 :                 if (bm_subcommit(lg) != GDK_SUCCEED) {
    1879             :                         /* cannot commit catalog, so remove log */
    1880             :                         MT_remove(filename);
    1881           0 :                         BBPrelease(lg->catalog_bid->batCacheid);
    1882           0 :                         BBPrelease(lg->catalog_id->batCacheid);
    1883           0 :                         BBPrelease(lg->dcatalog->batCacheid);
    1884           0 :                         goto error;
    1885             :                 }
    1886             :         } else {
    1887             :                 /* find the persistent catalog. As non persistent bats
    1888             :                  * require a logical reference we also add a logical
    1889             :                  * reference for the persistent bats */
    1890             :                 BUN p, q;
    1891             :                 BAT *b, *o, *d;
    1892             : 
    1893          78 :                 assert(!lg->inmemory);
    1894             : 
    1895             :                 /* the catalog exists, and so should the log file */
    1896          78 :                 if (fp == NULL && !LOG_DISABLED(lg)) {
    1897           0 :                         GDKerror("There is a logger catalog, but no log file.\n");
    1898           0 :                         goto error;
    1899             :                 }
    1900          78 :                 if (fp != NULL) {
    1901             :                         /* check_version always closes fp */
    1902          78 :                         if (check_version(lg, fp, fn, logdir, filename) != GDK_SUCCEED) {
    1903             :                                 fp = NULL;
    1904           0 :                                 goto error;
    1905             :                         }
    1906             :                         readlogs = true;
    1907             :                         fp = NULL;
    1908             :                 }
    1909             : 
    1910          78 :                 if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
    1911          70 :                         b = BATdescriptor(catalog_bid);
    1912          70 :                         if (b == NULL) {
    1913           0 :                                 GDKerror("inconsistent database, catalog does not exist");
    1914           0 :                                 goto error;
    1915             :                         }
    1916             : 
    1917          70 :                         strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
    1918          70 :                         catalog_id = BBPindex(bak);
    1919          70 :                         o = BATdescriptor(catalog_id);
    1920          70 :                         if (o == NULL) {
    1921           0 :                                 BBPunfix(b->batCacheid);
    1922           0 :                                 GDKerror("inconsistent database, catalog_id does not exist");
    1923           0 :                                 goto error;
    1924             :                         }
    1925             : 
    1926          70 :                         strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    1927          70 :                         dcatalog = BBPindex(bak);
    1928          70 :                         d = BATdescriptor(dcatalog);
    1929          70 :                         if (d == NULL) {
    1930           0 :                                 GDKerror("cannot create dcatalog bat");
    1931           0 :                                 BBPunfix(b->batCacheid);
    1932           0 :                                 BBPunfix(o->batCacheid);
    1933           0 :                                 goto error;
    1934             :                         }
    1935             : 
    1936          70 :                         lg->catalog_bid = b;
    1937          70 :                         lg->catalog_id = o;
    1938          70 :                         lg->dcatalog = d;
    1939          70 :                         const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    1940       17890 :                         BATloop(lg->catalog_bid, p, q) {
    1941       17820 :                                 bat bid = bids[p];
    1942       17820 :                                 oid pos = p;
    1943             : 
    1944       17820 :                                 if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
    1945           0 :                                     BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
    1946           0 :                                     BUNappend(lg->dcatalog, &pos, false) != GDK_SUCCEED)
    1947           0 :                                         goto error;
    1948             :                         }
    1949             :                 }
    1950          78 :                 BBPretain(lg->catalog_bid->batCacheid);
    1951          78 :                 BBPretain(lg->catalog_id->batCacheid);
    1952          78 :                 BBPretain(lg->dcatalog->batCacheid);
    1953             :         }
    1954         266 :         lg->catalog_cnt = logbat_new(TYPE_lng, 1, TRANSIENT);
    1955         266 :         if (lg->catalog_cnt == NULL) {
    1956           0 :                 GDKerror("failed to create catalog_cnt bat");
    1957           0 :                 goto error;
    1958             :         }
    1959         266 :         lg->catalog_lid = logbat_new(TYPE_lng, 1, TRANSIENT);
    1960         266 :         if (lg->catalog_lid == NULL) {
    1961           0 :                 GDKerror("failed to create catalog_lid bat");
    1962           0 :                 goto error;
    1963             :         }
    1964         266 :         if (bm_get_counts(lg) != GDK_SUCCEED)
    1965           0 :                 goto error;
    1966             : 
    1967         266 :         strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    1968         266 :         if (BBPindex(bak)) {
    1969          78 :                 lg->seqs_id = BATdescriptor(BBPindex(bak));
    1970          78 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    1971          78 :                 lg->seqs_val = BATdescriptor(BBPindex(bak));
    1972          78 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    1973          78 :                 lg->dseqs = BATdescriptor(BBPindex(bak));
    1974             :         } else {
    1975         188 :                 lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
    1976         188 :                 lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
    1977         188 :                 lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
    1978         188 :                 if (lg->seqs_id == NULL ||
    1979         188 :                     lg->seqs_val == NULL ||
    1980             :                     lg->dseqs == NULL) {
    1981           0 :                         GDKerror("Logger_new: cannot create seqs bats");
    1982           0 :                         goto error;
    1983             :                 }
    1984             : 
    1985         188 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    1986         188 :                 if (BBPrename(lg->seqs_id->batCacheid, bak) < 0) {
    1987           0 :                         goto error;
    1988             :                 }
    1989             : 
    1990         188 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    1991         188 :                 if (BBPrename(lg->seqs_val->batCacheid, bak) < 0) {
    1992           0 :                         goto error;
    1993             :                 }
    1994             : 
    1995         188 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    1996         188 :                 if (BBPrename(lg->dseqs->batCacheid, bak) < 0) {
    1997           0 :                         goto error;
    1998             :                 }
    1999             :                 needcommit = true;
    2000             :         }
    2001         266 :         dbg = GDKdebug;
    2002         266 :         GDKdebug &= ~(CHECKMASK|PROPMASK);
    2003         266 :         if (needcommit && bm_commit(lg) != GDK_SUCCEED) {
    2004           0 :                 GDKerror("Logger_new: commit failed");
    2005           0 :                 goto error;
    2006             :         }
    2007         266 :         GDKdebug = dbg;
    2008             : 
    2009         266 :         if (readlogs) {
    2010          78 :                 ulng log_id = lg->saved_id+1;
    2011          78 :                 if (logger_readlogs(lg, filename) != GDK_SUCCEED) {
    2012           0 :                         goto error;
    2013             :                 }
    2014          78 :                 if (lg->postfuncp && (*lg->postfuncp)(lg->funcdata, lg) != GDK_SUCCEED)
    2015           0 :                         goto error;
    2016          78 :                 dbg = GDKdebug;
    2017          78 :                 GDKdebug &= ~(CHECKMASK|PROPMASK);
    2018          78 :                 if (logger_commit(lg) != GDK_SUCCEED) {
    2019           0 :                         goto error;
    2020             :                 }
    2021          78 :                 GDKdebug = dbg;
    2022         175 :                 for( ; log_id <= lg->saved_id; log_id++)
    2023          97 :                         (void)logger_cleanup(lg, log_id);  /* ignore error of removing file */
    2024             :         } else {
    2025         188 :                 lg->id = lg->saved_id+1;
    2026             :         }
    2027             :         return GDK_SUCCEED;
    2028           0 :   error:
    2029           0 :         if (fp)
    2030           0 :                 fclose(fp);
    2031           0 :         logbat_destroy(lg->catalog_bid);
    2032           0 :         logbat_destroy(lg->catalog_id);
    2033           0 :         logbat_destroy(lg->dcatalog);
    2034           0 :         logbat_destroy(lg->seqs_id);
    2035           0 :         logbat_destroy(lg->seqs_val);
    2036           0 :         logbat_destroy(lg->dseqs);
    2037           0 :         logbat_destroy(lg->type_id);
    2038           0 :         logbat_destroy(lg->type_nme);
    2039           0 :         logbat_destroy(lg->type_nr);
    2040           0 :         GDKfree(lg->fn);
    2041           0 :         GDKfree(lg->dir);
    2042           0 :         GDKfree(lg->local_dir);
    2043           0 :         GDKfree(lg->buf);
    2044           0 :         GDKfree(lg);
    2045           0 :         GDKdebug = dbg;
    2046           0 :         return GDK_FAIL;
    2047             : }
    2048             : 
    2049             : /* Initialize a new logger
    2050             :  * It will load any data in the logdir and persist it in the BATs*/
    2051             : static logger *
    2052         266 : logger_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void *funcdata)
    2053             : {
    2054             :         logger *lg;
    2055             :         char filename[FILENAME_MAX];
    2056             : 
    2057         266 :         if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
    2058           0 :                 TRC_CRITICAL(GDK, "logdir must be relative path\n");
    2059           0 :                 return NULL;
    2060             :         }
    2061             : 
    2062         266 :         lg = GDKmalloc(sizeof(struct logger));
    2063         266 :         if (lg == NULL) {
    2064           0 :                 TRC_CRITICAL(GDK, "allocating logger structure failed\n");
    2065           0 :                 return NULL;
    2066             :         }
    2067             : 
    2068         266 :         *lg = (logger) {
    2069         266 :                 .inmemory = GDKinmemory(0),
    2070             :                 .debug = debug,
    2071             :                 .version = version,
    2072             :                 .prefuncp = prefuncp,
    2073             :                 .postfuncp = postfuncp,
    2074             :                 .funcdata = funcdata,
    2075             : 
    2076             :                 .id = 0,
    2077         266 :                 .saved_id = getBBPlogno(),              /* get saved log numer from bbp */
    2078         266 :                 .saved_tid = (int)getBBPtransid(),      /* get saved transaction id from bbp */
    2079             :         };
    2080         266 :         MT_lock_init(&lg->lock, fn);
    2081             : 
    2082             :         /* probably open file and check version first, then call call old logger code */
    2083         266 :         if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
    2084           0 :                 TRC_CRITICAL(GDK, "filename is too large\n");
    2085           0 :                 GDKfree(lg);
    2086           0 :                 return NULL;
    2087             :         }
    2088         266 :         lg->fn = GDKstrdup(fn);
    2089         266 :         lg->dir = GDKstrdup(filename);
    2090         266 :         lg->bufsize = 64*1024;
    2091         266 :         lg->buf = GDKmalloc(lg->bufsize);
    2092         266 :         if (lg->fn == NULL || lg->dir == NULL || lg->buf == NULL) {
    2093           0 :                 TRC_CRITICAL(GDK, "strdup failed\n");
    2094           0 :                 GDKfree(lg->fn);
    2095           0 :                 GDKfree(lg->dir);
    2096           0 :                 GDKfree(lg->buf);
    2097           0 :                 GDKfree(lg);
    2098           0 :                 return NULL;
    2099             :         }
    2100         266 :         if (lg->debug & 1) {
    2101           0 :                 fprintf(stderr, "#logger_new dir set to %s\n", lg->dir);
    2102             :         }
    2103             : 
    2104         266 :         if (logger_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
    2105         266 :                 return lg;
    2106             :         }
    2107             :         return NULL;
    2108             : }
    2109             : 
    2110             : void
    2111         265 : logger_destroy(logger *lg)
    2112             : {
    2113        1189 :         for (logged_range *p = lg->pending; p; ){
    2114         924 :                 logged_range *n = p->next;
    2115         924 :                 GDKfree(p);
    2116             :                 p = n;
    2117             :         }
    2118         265 :         if (LOG_DISABLED(lg)) {
    2119           1 :                 lg->saved_id = lg->id;
    2120           1 :                 lg->saved_tid = lg->tid;
    2121           1 :                 logger_commit(lg);
    2122             :         }
    2123         265 :         if (lg->catalog_bid) {
    2124             :                 logger_lock(lg);
    2125             :                 BUN p, q;
    2126         265 :                 BAT *b = lg->catalog_bid;
    2127             : 
    2128             :                 /* free resources */
    2129         265 :                 const log_bid *bids = (const log_bid *) Tloc(b, 0);
    2130       70385 :                 BATloop(b, p, q) {
    2131       70120 :                         bat bid = bids[p];
    2132             : 
    2133       70120 :                         BBPrelease(bid);
    2134             :                 }
    2135             : 
    2136         265 :                 BBPrelease(lg->catalog_bid->batCacheid);
    2137         265 :                 BBPrelease(lg->catalog_id->batCacheid);
    2138         265 :                 BBPrelease(lg->dcatalog->batCacheid);
    2139         265 :                 logbat_destroy(lg->catalog_bid);
    2140         265 :                 logbat_destroy(lg->catalog_id);
    2141         265 :                 logbat_destroy(lg->dcatalog);
    2142             : 
    2143         265 :                 logbat_destroy(lg->catalog_cnt);
    2144         265 :                 logbat_destroy(lg->catalog_lid);
    2145             :                 logger_unlock(lg);
    2146             :         }
    2147         265 :         GDKfree(lg->fn);
    2148         265 :         GDKfree(lg->dir);
    2149         265 :         GDKfree(lg->buf);
    2150         265 :         logger_close_input(lg);
    2151         265 :         logger_close_output(lg);
    2152         265 :         GDKfree(lg);
    2153         265 : }
    2154             : 
    2155             : /* Create a new logger */
    2156             : logger *
    2157         266 : logger_create(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void *funcdata)
    2158             : {
    2159             :         logger *lg;
    2160         266 :         lg = logger_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
    2161         266 :         if (lg == NULL)
    2162             :                 return NULL;
    2163         266 :         if (lg->debug & 1) {
    2164           0 :                 printf("# Started processing logs %s/%s version %d\n",fn,logdir,version);
    2165           0 :                 fflush(stdout);
    2166             :         }
    2167         266 :         if (lg->debug & 1) {
    2168           0 :                 printf("# Finished processing logs %s/%s\n",fn,logdir);
    2169           0 :                 fflush(stdout);
    2170             :         }
    2171         266 :         if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
    2172           0 :                 logger_destroy(lg);
    2173           0 :                 return NULL;
    2174             :         }
    2175         266 :         if (logger_open_output(lg) != GDK_SUCCEED) {
    2176           0 :                 logger_destroy(lg);
    2177           0 :                 return NULL;
    2178             :         }
    2179             :         return lg;
    2180             : }
    2181             : 
    2182             : static ulng
    2183             : logger_next_logfile(logger *lg, ulng ts)
    2184             : {
    2185       11124 :         if (!lg->pending || !lg->pending->next)
    2186             :                 return 0;
    2187       11124 :         if (lg->pending->last_ts < ts)
    2188       11124 :                 return lg->pending->id;
    2189             :         return 0;
    2190             : }
    2191             : 
    2192             : static void
    2193       11124 : logger_cleanup_range(logger *lg)
    2194             : {
    2195       11124 :         logged_range *p = lg->pending;
    2196       11124 :         if (p) {
    2197       11124 :                 lg->pending = p->next;
    2198       11124 :                 GDKfree(p);
    2199             :         }
    2200       11124 : }
    2201             : 
    2202             : gdk_return
    2203           0 : logger_activate(logger *lg)
    2204             : {
    2205           0 :         if (lg->end > 0 && lg->saved_id+1 == lg->id) {
    2206           0 :                 lg->id++;
    2207           0 :                 logger_close_output(lg);
    2208             :                 /* start new file */
    2209           0 :                 if (logger_open_output(lg) != GDK_SUCCEED)
    2210           0 :                         return GDK_FAIL;
    2211             :         }
    2212             :         return GDK_SUCCEED;
    2213             : }
    2214             : 
    2215             : gdk_return
    2216       11124 : logger_flush(logger *lg, ulng ts)
    2217             : {
    2218             :         ulng lid = logger_next_logfile(lg, ts);
    2219       11124 :         if (LOG_DISABLED(lg)) {
    2220           0 :                 lg->saved_id = lid;
    2221           0 :                 lg->saved_tid = lg->tid;
    2222           0 :                 if (lid)
    2223           0 :                         logger_cleanup_range(lg);
    2224           0 :                 if (logger_commit(lg) != GDK_SUCCEED)
    2225           0 :                         TRC_ERROR(GDK, "failed to commit");
    2226           0 :                 return GDK_SUCCEED;
    2227             :         }
    2228       11124 :         if (lg->saved_id >= lid)
    2229             :                 return GDK_SUCCEED;
    2230       11124 :         if (lg->saved_id+1 >= lg->id) /* logger should first release the file */
    2231             :                 return GDK_SUCCEED;
    2232             :         log_return res = LOG_OK;
    2233       22248 :         while(lg->saved_id < lid && res == LOG_OK) {
    2234       11124 :                 if (lg->saved_id >= lg->id)
    2235             :                         break;
    2236       11124 :                 if (!lg->input_log) {
    2237             :                         char *filename;
    2238             :                         char id[32];
    2239       11124 :                         if (snprintf(id, sizeof(id), LLFMT, lg->saved_id+1) >= (int) sizeof(id)) {
    2240           0 :                                 TRC_CRITICAL(GDK, "log_id filename is too large\n");
    2241           0 :                                 return GDK_FAIL;
    2242             :                         }
    2243       11124 :                         if (!(filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)))
    2244             :                                 return GDK_FAIL;
    2245       11124 :                         if (strlen(filename) >= FILENAME_MAX) {
    2246           0 :                                 GDKerror("Logger filename path is too large\n");
    2247           0 :                                 GDKfree(filename);
    2248           0 :                                 return GDK_FAIL;
    2249             :                         }
    2250             : 
    2251       11124 :                         bool filemissing = false;
    2252       11124 :                         if (logger_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
    2253           0 :                                 GDKfree(filename);
    2254           0 :                                 return GDK_FAIL;
    2255             :                         }
    2256       11124 :                         GDKfree(filename);
    2257             :                 }
    2258             :                 /* we read the full file because skipping is impossible with current log format */
    2259             :                 logger_lock(lg);
    2260       11124 :                 lg->flushing = 1;
    2261       11124 :                 res = logger_read_transaction(lg);
    2262       11124 :                 lg->flushing = 0;
    2263             :                 logger_unlock(lg);
    2264       11124 :                 if (res == LOG_EOF) {
    2265       11124 :                         logger_close_input(lg);
    2266             :                         res = LOG_OK;
    2267             :                 }
    2268           0 :                 if (res != LOG_ERR) {
    2269       11124 :                         lg->saved_id++;
    2270       11124 :                         if (logger_commit(lg) != GDK_SUCCEED) {
    2271           0 :                                 TRC_ERROR(GDK, "failed to commit");
    2272             :                                 res = LOG_ERR;
    2273             :                         }
    2274             : 
    2275             :                         /* remove old log file */
    2276       11124 :                         if (res != LOG_ERR) {
    2277       11124 :                                 if (logger_cleanup(lg, lg->saved_id) != GDK_SUCCEED)
    2278             :                                         res = LOG_ERR;
    2279             :                         }
    2280             :                 }
    2281             :         }
    2282       11124 :         if (lid && res == LOG_OK)
    2283       11124 :                 logger_cleanup_range(lg);
    2284       11124 :         return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    2285             : }
    2286             : 
    2287             : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
    2288             :  * Only the bak- files are deleted for the preserved WAL files.
    2289             :  */
    2290             : lng
    2291       14218 : logger_changes(logger *lg)
    2292             : {
    2293       14218 :         return (lg->id - lg->saved_id - 1);
    2294             : }
    2295             : 
    2296             : int
    2297         375 : logger_sequence(logger *lg, int seq, lng *id)
    2298             : {
    2299             :         logger_lock(lg);
    2300         375 :         BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
    2301             : 
    2302         375 :         if (p != BUN_NONE) {
    2303         283 :                 *id = *(lng *) Tloc(lg->seqs_val, p);
    2304             : 
    2305             :                 logger_unlock(lg);
    2306         283 :                 return 1;
    2307             :         }
    2308             :         logger_unlock(lg);
    2309          92 :         return 0;
    2310             : }
    2311             : 
    2312             : gdk_return
    2313       94793 : log_constant(logger *lg, int type, ptr val, log_id id, lng offset, lng cnt)
    2314             : {
    2315       94793 :         bte tpe = find_type(lg, type);
    2316             :         gdk_return ok = GDK_SUCCEED;
    2317             :         logformat l;
    2318             :         lng nr;
    2319             :         int is_row = 0;
    2320             : 
    2321       94793 :         if (lg->row_insert_nrcols != 0) {
    2322           0 :                 lg->row_insert_nrcols--;
    2323             :                 is_row = 1;
    2324             :         }
    2325       94793 :         l.flag = LOG_UPDATE_CONST;
    2326       94793 :         l.id = id;
    2327             :         nr = cnt;
    2328             : 
    2329       94793 :         if (LOG_DISABLED(lg) || !nr) {
    2330             :                 /* logging is switched off */
    2331       41891 :                 if (nr) {
    2332             :                         logger_lock(lg);
    2333       35159 :                         ok = la_bat_update_count(lg, id, offset+cnt);
    2334             :                         logger_unlock(lg);
    2335             :                 }
    2336       41891 :                 return ok;
    2337             :         }
    2338             : 
    2339       52902 :         gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
    2340             : 
    2341       52902 :         if (is_row)
    2342           0 :                 l.flag = tpe;
    2343       52902 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2344       52902 :             (!is_row && !mnstr_writeLng(lg->output_log, nr)) ||
    2345       52902 :             (!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 1) ||
    2346       52902 :             (!is_row && !mnstr_writeLng(lg->output_log, offset))) {
    2347             :                 ok = GDK_FAIL;
    2348           0 :                 goto bailout;
    2349             :         }
    2350             : 
    2351       52902 :         ok = wt(val, lg->output_log, 1);
    2352             : 
    2353       52902 :         if (lg->debug & 1)
    2354           0 :                 fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr);
    2355             : 
    2356       52902 :   bailout:
    2357       52902 :         if (ok != GDK_SUCCEED) {
    2358           0 :                 const char *err = mnstr_peek_error(lg->output_log);
    2359           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2360             :         }
    2361             :         return ok;
    2362             : }
    2363             : 
    2364             : static gdk_return
    2365       13511 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
    2366             : {
    2367       13511 :         size_t bufsz = lg->bufsize, resize = 0;
    2368       13511 :         BUN end = (BUN)(offset + nr);
    2369       13511 :         char *buf = lg->buf;
    2370             :         gdk_return res = GDK_SUCCEED;
    2371             : 
    2372       13511 :         if (!buf)
    2373             :                 return GDK_FAIL;
    2374       13511 :         BATiter bi = bat_iterator(b);
    2375       13511 :         BUN p = (BUN)offset;
    2376       27073 :         for ( ; p < end; ) {
    2377             :                 size_t sz = 0;
    2378       13562 :                 if (resize) {
    2379           1 :                         if (!(buf = GDKrealloc(lg->buf, resize))) {
    2380             :                                 res = GDK_FAIL;
    2381             :                                 break;
    2382             :                         }
    2383           1 :                         lg->buf = buf;
    2384           1 :                         lg->bufsize = bufsz = resize;
    2385             :                         resize = 0;
    2386             :                 }
    2387             :                 char *dst = buf;
    2388       57477 :                 for(; p < end && sz < bufsz; p++) {
    2389       43966 :                         char *s = BUNtail(bi, p);
    2390       43966 :                         size_t len = strlen(s)+1;
    2391       43966 :                         if ((sz+len) > bufsz) {
    2392          51 :                                 if (len > bufsz)
    2393           1 :                                         resize = len+bufsz;
    2394             :                                 break;
    2395             :                         } else {
    2396       43915 :                                 memcpy(dst, s, len);
    2397       43915 :                                 dst += len;
    2398             :                                 sz += len;
    2399             :                         }
    2400             :                 }
    2401       13562 :                 if (sz && (!mnstr_writeLng(lg->output_log, (lng) sz) || mnstr_write(lg->output_log, buf, sz, 1) != 1)) {
    2402             :                         res = GDK_FAIL;
    2403             :                         break;
    2404             :                 }
    2405             :         }
    2406       13511 :         bat_iterator_end(&bi);
    2407       13511 :         return res;
    2408             : }
    2409             : 
    2410             : static gdk_return
    2411      218695 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced)
    2412             : {
    2413      218695 :         bte tpe = find_type(lg, b->ttype);
    2414             :         gdk_return ok = GDK_SUCCEED;
    2415             :         logformat l;
    2416             :         BUN p;
    2417             :         lng nr;
    2418             :         int is_row = 0;
    2419             : 
    2420      218695 :         if (lg->row_insert_nrcols != 0) {
    2421           0 :                 lg->row_insert_nrcols--;
    2422             :                 is_row = 1;
    2423             :         }
    2424      218695 :         l.flag = LOG_UPDATE_BULK;
    2425      218695 :         l.id = id;
    2426             :         nr = cnt;
    2427             : 
    2428      218695 :         if (LOG_DISABLED(lg) || !nr) {
    2429             :                 /* logging is switched off */
    2430      170867 :                 lg->end += nr;
    2431      170867 :                 if (nr)
    2432      120018 :                         return la_bat_update_count(lg, id, offset+cnt);
    2433             :                 return GDK_SUCCEED;
    2434             :         }
    2435             : 
    2436       47828 :         gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
    2437             : 
    2438       47828 :         if (is_row)
    2439           0 :                 l.flag = tpe;
    2440       47828 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2441       47828 :             (!is_row && !mnstr_writeLng(lg->output_log, nr)) ||
    2442       47828 :             (!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 1) ||
    2443       47828 :             (!is_row && !mnstr_writeLng(lg->output_log, offset))) {
    2444             :                 ok = GDK_FAIL;
    2445           0 :                 goto bailout;
    2446             :         }
    2447             : 
    2448             :         /* if offset is just for the log, but BAT is already sliced, reset offset */
    2449       47828 :         if (sliced)
    2450             :                 offset = 0;
    2451       47828 :         if (b->ttype == TYPE_msk) {
    2452           0 :                 BATiter bi = bat_iterator(b);
    2453           0 :                 if (offset % 32 == 0) {
    2454           0 :                         if (!mnstr_writeIntArray(lg->output_log, (int *) ((char *) bi.base + offset / 32), (size_t) ((nr + 31) / 32)))
    2455             :                                 ok = GDK_FAIL;
    2456             :                 } else {
    2457           0 :                         for (lng i = 0; i < nr; i += 32) {
    2458             :                                 uint32_t v = 0;
    2459           0 :                                 for (int j = 0; j < 32 && i + j < nr; j++)
    2460           0 :                                         v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
    2461           0 :                                 if (!mnstr_writeInt(lg->output_log, (int) v)) {
    2462             :                                         ok = GDK_FAIL;
    2463             :                                         break;
    2464             :                                 }
    2465             :                         }
    2466             :                 }
    2467           0 :                 bat_iterator_end(&bi);
    2468       81518 :         } else if (b->ttype < TYPE_str && !isVIEW(b)) {
    2469       33690 :                 BATiter bi = bat_iterator(b);
    2470       33690 :                 const void *t = BUNtail(bi, (BUN)offset);
    2471             : 
    2472       33690 :                 ok = wt(t, lg->output_log, (size_t)nr);
    2473       33690 :                 bat_iterator_end(&bi);
    2474       14138 :         } else if (b->ttype == TYPE_str) {
    2475             :                 /* efficient string writes */
    2476       13506 :                 ok = string_writer(lg, b, offset, nr);
    2477             :         } else {
    2478         632 :                 BATiter bi = bat_iterator(b);
    2479         632 :                 BUN end = (BUN)(offset+nr);
    2480        1339 :                 for (p = (BUN)offset; p < end && ok == GDK_SUCCEED; p++) {
    2481         707 :                         const void *t = BUNtail(bi, p);
    2482             : 
    2483         707 :                         ok = wt(t, lg->output_log, 1);
    2484             :                 }
    2485         632 :                 bat_iterator_end(&bi);
    2486             :         }
    2487             : 
    2488       47828 :         if (lg->debug & 1)
    2489           0 :                 fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr);
    2490             : 
    2491       47828 :   bailout:
    2492       47828 :         if (ok != GDK_SUCCEED) {
    2493           0 :                 const char *err = mnstr_peek_error(lg->output_log);
    2494           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2495             :         }
    2496             :         return ok;
    2497             : }
    2498             : 
    2499             : /*
    2500             :  * Changes made to the BAT descriptor should be stored in the log
    2501             :  * files.  Actually, we need to save the descriptor file, perhaps we
    2502             :  * should simply introduce a versioning scheme.
    2503             :  */
    2504             : gdk_return
    2505       67969 : log_bat_persists(logger *lg, BAT *b, log_id id)
    2506             : {
    2507             :         logger_lock(lg);
    2508       67969 :         bte ta = find_type(lg, b->ttype);
    2509             :         logformat l;
    2510             : 
    2511       67969 :         if (logger_add_bat(lg, b, id) != GDK_SUCCEED) {
    2512             :                 logger_unlock(lg);
    2513           0 :                 return GDK_FAIL;
    2514             :         }
    2515             : 
    2516       67969 :         l.flag = LOG_CREATE;
    2517       67969 :         l.id = id;
    2518       67969 :         if (!LOG_DISABLED(lg)) {
    2519        6270 :                 if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2520        3135 :                     mnstr_write(lg->output_log, &ta, 1, 1) != 1) {
    2521             :                         logger_unlock(lg);
    2522           0 :                         return GDK_FAIL;
    2523             :                 }
    2524             :         }
    2525       67969 :         lg->end++;
    2526       67969 :         if (lg->debug & 1)
    2527           0 :                 fprintf(stderr, "#persists id (%d) bat (%d)\n", id, b->batCacheid);
    2528       67969 :         gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0);
    2529             :         logger_unlock(lg);
    2530       67969 :         return r;
    2531             : }
    2532             : 
    2533             : gdk_return
    2534       19266 : log_bat_transient(logger *lg, log_id id)
    2535             : {
    2536             :         logger_lock(lg);
    2537       19266 :         log_bid bid = internal_find_bat(lg, id);
    2538             :         logformat l;
    2539             : 
    2540       19266 :         l.flag = LOG_DESTROY;
    2541       19266 :         l.id = id;
    2542             : 
    2543       19266 :         if (!LOG_DISABLED(lg)) {
    2544       10924 :                 if (log_write_format(lg, &l) != GDK_SUCCEED) {
    2545           0 :                         TRC_CRITICAL(GDK, "write failed\n");
    2546             :                         logger_unlock(lg);
    2547           0 :                         return GDK_FAIL;
    2548             :                 }
    2549             :         }
    2550       19266 :         lg->end++;
    2551       19266 :         if (lg->debug & 1)
    2552           0 :                 fprintf(stderr, "#Logged destroyed bat (%d) %d\n", id,
    2553             :                                 bid);
    2554       19266 :         lg->end += BATcount(BBPquickdesc(bid));
    2555       19266 :         gdk_return r =  logger_del_bat(lg, bid);
    2556             :         logger_unlock(lg);
    2557       19266 :         return r;
    2558             : }
    2559             : 
    2560             : gdk_return
    2561      149314 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt)
    2562             : {
    2563             :         logger_lock(lg);
    2564      149314 :         gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0);
    2565             :         logger_unlock(lg);
    2566      149314 :         return r;
    2567             : }
    2568             : 
    2569             : gdk_return
    2570        1445 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
    2571             : {
    2572             :         logger_lock(lg);
    2573        1445 :         bte tpe = find_type(lg, uval->ttype);
    2574             :         gdk_return ok = GDK_SUCCEED;
    2575             :         logformat l;
    2576             :         BUN p;
    2577             :         lng nr;
    2578             : 
    2579        1445 :         if (BATtdense(uid)) {
    2580        1412 :                 ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1);
    2581             :                 logger_unlock(lg);
    2582        1412 :                 return ok;
    2583             :         }
    2584             : 
    2585          33 :         assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
    2586             : 
    2587          33 :         l.flag = LOG_UPDATE;
    2588          33 :         l.id = id;
    2589          33 :         nr = (BUNlast(uval));
    2590          33 :         assert(nr);
    2591             : 
    2592          33 :         lg->end += nr;
    2593          33 :         if (LOG_DISABLED(lg)) {
    2594             :                 /* logging is switched off */
    2595             :                 logger_unlock(lg);
    2596          12 :                 return GDK_SUCCEED;
    2597             :         }
    2598             : 
    2599          21 :         BATiter vi = bat_iterator(uval);
    2600          21 :         gdk_return (*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
    2601          21 :         gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
    2602             : 
    2603          42 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2604          42 :             !mnstr_writeLng(lg->output_log, nr) ||
    2605          21 :              mnstr_write(lg->output_log, &tpe, 1, 1) != 1){
    2606             :                 ok = GDK_FAIL;
    2607           0 :                 goto bailout;
    2608             :         }
    2609          90 :         for (p = 0; p < BUNlast(uid) && ok == GDK_SUCCEED; p++) {
    2610          69 :                 const oid id = BUNtoid(uid, p);
    2611             : 
    2612          69 :                 ok = wh(&id, lg->output_log, 1);
    2613             :         }
    2614          21 :         if (uval->ttype == TYPE_msk) {
    2615           0 :                 if (!mnstr_writeIntArray(lg->output_log, vi.base, (BUNlast(uval) + 31) / 32))
    2616             :                         ok = GDK_FAIL;
    2617          21 :         } else if (uval->ttype == TYPE_str) {
    2618             :                 /* efficient string writes */
    2619           5 :                 ok = string_writer(lg, uval, 0, nr);
    2620             :         } else {
    2621          74 :                 for (p = 0; p < BUNlast(uid) && ok == GDK_SUCCEED; p++) {
    2622          58 :                         const void *val = BUNtail(vi, p);
    2623             : 
    2624          58 :                         ok = wt(val, lg->output_log, 1);
    2625             :                 }
    2626             :         }
    2627             : 
    2628          21 :         if (lg->debug & 1)
    2629           0 :                 fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr);
    2630             : 
    2631          21 :   bailout:
    2632          21 :         bat_iterator_end(&vi);
    2633          21 :         if (ok != GDK_SUCCEED) {
    2634           0 :                 const char *err = mnstr_peek_error(lg->output_log);
    2635           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2636             :         }
    2637             :         logger_unlock(lg);
    2638          21 :         return ok;
    2639             : }
    2640             : 
    2641             : 
    2642             : gdk_return
    2643           0 : log_bat_clear(logger *lg, int id)
    2644             : {
    2645             :         logformat l;
    2646             : 
    2647           0 :         lg->end++;
    2648           0 :         if (LOG_DISABLED(lg)) {
    2649             :                 logger_lock(lg);
    2650           0 :                 gdk_return res = la_bat_update_count(lg, id, 0);
    2651             :                 logger_unlock(lg);
    2652           0 :                 return res;
    2653             :         }
    2654             : 
    2655           0 :         l.flag = LOG_CLEAR;
    2656           0 :         l.id = id;
    2657             : 
    2658           0 :         if (lg->debug & 1)
    2659           0 :                 fprintf(stderr, "#Logged clear %d\n", id);
    2660           0 :         return log_write_format(lg, &l);
    2661             : }
    2662             : 
    2663             : #define DBLKSZ          8192
    2664             : #define SEGSZ           (64*DBLKSZ)
    2665             : 
    2666             : #define LOG_MINI        (LL_CONSTANT(2)*1024)
    2667             : #define LOG_LARGE       (LL_CONSTANT(2)*1024*1024*1024)
    2668             : 
    2669             : static gdk_return
    2670       11810 : new_logfile(logger *lg)
    2671             : {
    2672       11810 :         lng log_large = (GDKdebug & FORCEMITOMASK)?LOG_MINI:LOG_LARGE;
    2673       11810 :         assert(!LOG_DISABLED(lg));
    2674             :         lng p;
    2675       11810 :         p = (lng) getfilepos(getFile(lg->output_log));
    2676       11810 :         if (p == -1)
    2677             :                 return GDK_FAIL;
    2678       11810 :         if (p > log_large || (lg->end*1024) > log_large) {
    2679        7669 :                 lg->id++;
    2680        7669 :                 logger_close_output(lg);
    2681        7669 :                 return logger_open_output(lg);
    2682             :         }
    2683             :         return GDK_SUCCEED;
    2684             : }
    2685             : 
    2686             : gdk_return
    2687       15927 : log_tend(logger *lg)
    2688             : {
    2689             :         logformat l;
    2690             : 
    2691       15927 :         if (lg->debug & 1)
    2692           0 :                 fprintf(stderr, "#log_tend %d\n", lg->tid);
    2693             : 
    2694       15927 :         l.flag = LOG_END;
    2695       15927 :         l.id = lg->tid;
    2696       15927 :         if (lg->flushnow) {
    2697        4114 :                 lg->flushnow = 0;
    2698        4114 :                 return logger_commit(lg);
    2699             :         }
    2700             : 
    2701       11813 :         lg->end++;
    2702       11813 :         if (LOG_DISABLED(lg)) {
    2703             :                 return GDK_SUCCEED;
    2704             :         }
    2705             : 
    2706       23620 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2707       11810 :             mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) ||
    2708       23620 :             (!(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->output_log)) ||
    2709       11810 :             new_logfile(lg) != GDK_SUCCEED) {
    2710           0 :                 TRC_CRITICAL(GDK, "write failed\n");
    2711           0 :                 return GDK_FAIL;
    2712             :         }
    2713             :         return GDK_SUCCEED;
    2714             : }
    2715             : 
    2716             : gdk_return
    2717       15927 : log_tdone(logger *lg, ulng commit_ts)
    2718             : {
    2719       15927 :         if (lg->debug & 1)
    2720           0 :                 fprintf(stderr, "#log_tdone %d\n", lg->tid);
    2721             : 
    2722       15927 :         if (lg->current) {
    2723       15927 :                 lg->current->last_tid = lg->tid;
    2724       15927 :                 lg->current->last_ts = commit_ts;
    2725             :         }
    2726       15927 :         return GDK_SUCCEED;
    2727             : }
    2728             : 
    2729             : static gdk_return
    2730        6219 : log_sequence_(logger *lg, int seq, lng val, int flush)
    2731             : {
    2732             :         logformat l;
    2733             : 
    2734        6219 :         if (LOG_DISABLED(lg))
    2735             :                 return GDK_SUCCEED;
    2736        2723 :         l.flag = LOG_SEQ;
    2737        2723 :         l.id = seq;
    2738             : 
    2739        2723 :         if (lg->debug & 1)
    2740           0 :                 fprintf(stderr, "#log_sequence_ (%d," LLFMT ")\n", seq, val);
    2741             : 
    2742        5446 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2743        5446 :             !mnstr_writeLng(lg->output_log, val) ||
    2744        2723 :             (flush && mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA)) ||
    2745        2723 :             (flush && !(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->output_log))) {
    2746           0 :                 TRC_CRITICAL(GDK, "write failed\n");
    2747           0 :                 return GDK_FAIL;
    2748             :         }
    2749             :         return GDK_SUCCEED;
    2750             : }
    2751             : 
    2752             : /* a transaction in it self */
    2753             : gdk_return
    2754        6219 : log_sequence(logger *lg, int seq, lng val)
    2755             : {
    2756             :         BUN p;
    2757             : 
    2758        6219 :         if (lg->debug & 1)
    2759           0 :                 fprintf(stderr, "#log_sequence (%d," LLFMT ")\n", seq, val);
    2760             : 
    2761             :         logger_lock(lg);
    2762        6219 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
    2763        5884 :             p >= lg->seqs_id->batInserted) {
    2764        1721 :                 assert(lg->seqs_val->hseqbase == 0);
    2765        1721 :                 if (BUNreplace(lg->seqs_val, p, &val, false) != GDK_SUCCEED) {
    2766             :                         logger_unlock(lg);
    2767           0 :                         return GDK_FAIL;
    2768             :                 }
    2769             :         } else {
    2770        4498 :                 if (p != BUN_NONE) {
    2771        4163 :                         oid pos = p;
    2772        4163 :                         if (BUNappend(lg->dseqs, &pos, false) != GDK_SUCCEED) {
    2773             :                                 logger_unlock(lg);
    2774           0 :                                 return GDK_FAIL;
    2775             :                         }
    2776             :                 }
    2777        8996 :                 if (BUNappend(lg->seqs_id, &seq, false) != GDK_SUCCEED ||
    2778        4498 :                     BUNappend(lg->seqs_val, &val, false) != GDK_SUCCEED) {
    2779             :                         logger_unlock(lg);
    2780           0 :                         return GDK_FAIL;
    2781             :                 }
    2782             :         }
    2783        6219 :         gdk_return r = log_sequence_(lg, seq, val, 1);
    2784             :         logger_unlock(lg);
    2785        6219 :         return r;
    2786             : }
    2787             : 
    2788             : static gdk_return
    2789       15505 : bm_commit(logger *lg)
    2790             : {
    2791             :         BUN p;
    2792             :         logger_lock(lg);
    2793       15505 :         BAT *b = lg->catalog_bid;
    2794             :         const log_bid *bids;
    2795             : 
    2796       15505 :         bids = (log_bid *) Tloc(b, 0);
    2797       83417 :         for (p = b->batInserted; p < BUNlast(b); p++) {
    2798       67912 :                 log_bid bid = bids[p];
    2799             :                 BAT *lb;
    2800             : 
    2801       67912 :                 assert(bid);
    2802      135824 :                 if ((lb = BATdescriptor(bid)) == NULL ||
    2803       67912 :                     BATmode(lb, false) != GDK_SUCCEED) {
    2804           0 :                         TRC_WARNING(GDK, "Failed to set bat (%d%s) persistent\n", bid, !lb?" gone":"");
    2805             :                         logbat_destroy(lb);
    2806             :                         logger_unlock(lg);
    2807           0 :                         return GDK_FAIL;
    2808             :                 }
    2809             : 
    2810       67912 :                 assert(lb->batRestricted != BAT_WRITE);
    2811             :                 logbat_destroy(lb);
    2812             : 
    2813       67912 :                 if (lg->debug & 1)
    2814           0 :                         fprintf(stderr, "#bm_commit: create %d (%d)\n",
    2815           0 :                                 bid, BBP_lrefs(bid));
    2816             :         }
    2817             :         logger_unlock(lg);
    2818       15505 :         return bm_subcommit(lg);
    2819             : }
    2820             : 
    2821             : static gdk_return
    2822       68201 : logger_add_bat(logger *lg, BAT *b, log_id id)
    2823             : {
    2824       68201 :         log_bid bid = internal_find_bat(lg, id);
    2825       68201 :         lng cnt = 0;
    2826       68201 :         lng lid = lng_nil;
    2827             : 
    2828       68201 :         assert(b->batRestricted != BAT_WRITE ||
    2829             :                b == lg->catalog_bid ||
    2830             :                b == lg->catalog_id ||
    2831             :                b == lg->dcatalog ||
    2832             :                b == lg->seqs_id ||
    2833             :                b == lg->seqs_val ||
    2834             :                b == lg->dseqs);
    2835       68201 :         assert(b->batRole == PERSISTENT);
    2836       68201 :         if (bid) {
    2837         586 :                 if (bid != b->batCacheid) {
    2838         584 :                         if (logger_del_bat(lg, bid) != GDK_SUCCEED)
    2839             :                                 return GDK_FAIL;
    2840             :                 } else {
    2841             :                         return GDK_SUCCEED;
    2842             :                 }
    2843             :         }
    2844       68199 :         bid = b->batCacheid;
    2845       68199 :         if (lg->debug & 1)
    2846           0 :                 fprintf(stderr, "#create %d\n", id);
    2847       68199 :         assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
    2848      136398 :         if (BUNappend(lg->catalog_bid, &bid, false) != GDK_SUCCEED ||
    2849      136398 :             BUNappend(lg->catalog_id, &id, false) != GDK_SUCCEED ||
    2850      136398 :             BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
    2851       68199 :             BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
    2852           0 :                 return GDK_FAIL;
    2853       68199 :         lg->cnt++;
    2854       68199 :         BBPretain(bid);
    2855       68199 :         return GDK_SUCCEED;
    2856             : }
    2857             : 
    2858             : static gdk_return
    2859       19886 : logger_del_bat(logger *lg, log_bid bid)
    2860             : {
    2861       19886 :         BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
    2862             :         oid pos;
    2863       19886 :         lng lid = lg->tid;
    2864             : 
    2865       19886 :         assert(p != BUN_NONE);
    2866             :         if (p == BUN_NONE) {
    2867             :                 GDKerror("cannot find BAT\n");
    2868             :                 return GDK_FAIL;
    2869             :         }
    2870             : 
    2871       19886 :         pos = (oid) p;
    2872       19886 :         assert(lg->catalog_lid->hseqbase == 0);
    2873       19886 :         if (BUNreplace(lg->catalog_lid, p, &lid, false) != GDK_SUCCEED)
    2874             :                 return GDK_FAIL;
    2875       19886 :         if (BUNappend(lg->dcatalog, &pos, false) == GDK_SUCCEED) {
    2876       19886 :                 lg->deleted++;
    2877       19886 :                 return GDK_SUCCEED;
    2878             :         }
    2879             :         return GDK_FAIL;
    2880             : }
    2881             : 
    2882             : log_bid
    2883       20546 : logger_find_bat(logger *lg, log_id id)
    2884             : {
    2885             :         logger_lock(lg);
    2886       20546 :         log_bid bid = internal_find_bat(lg, id);
    2887             :         logger_unlock(lg);
    2888       20546 :         return bid;
    2889             : }
    2890             : 
    2891             : 
    2892             : gdk_return
    2893       15927 : log_tstart(logger *lg, bool flushnow)
    2894             : {
    2895             :         logformat l;
    2896             : 
    2897       15927 :         if (flushnow) {
    2898        4114 :                 lg->id++;
    2899        4114 :                 logger_close_output(lg);
    2900             :                 /* start new file */
    2901        4114 :                 if (logger_open_output(lg) != GDK_SUCCEED)
    2902             :                         return GDK_FAIL;
    2903       14465 :                 while (lg->saved_id+1 < lg->id)
    2904       10351 :                         logger_flush(lg, (1ULL<<63));
    2905        4114 :                 lg->flushnow = flushnow;
    2906             :         }
    2907             : 
    2908       15927 :         lg->end++;
    2909       15927 :         if (LOG_DISABLED(lg)) {
    2910             :                 return GDK_SUCCEED;
    2911             :         }
    2912             : 
    2913       11810 :         l.flag = LOG_START;
    2914       11810 :         l.id = ++lg->tid;
    2915             : 
    2916       11810 :         if (lg->debug & 1)
    2917           0 :                 fprintf(stderr, "#log_tstart %d\n", lg->tid);
    2918       11810 :         if (log_write_format(lg, &l) != GDK_SUCCEED)
    2919           0 :                 return GDK_FAIL;
    2920             :         return GDK_SUCCEED;
    2921             : }

Generated by: LCOV version 1.14