LCOV - code coverage report
Current view: top level - gdk - gdk_logger.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1074 1492 72.0 %
Date: 2021-10-13 02:24:04 Functions: 68 72 94.4 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "gdk.h"
      11             : #include "gdk_private.h"
      12             : #include "gdk_logger.h"
      13             : #include "gdk_logger_internals.h"
      14             : #include "mutils.h"
      15             : #include <string.h>
      16             : 
      17             : static gdk_return logger_add_bat(logger *lg, BAT *b, log_id id);
      18             : static gdk_return logger_del_bat(logger *lg, log_bid bid);
      19             : /*
      20             :  * The logger uses a directory to store its log files. One master log
      21             :  * file stores information about the version of the logger and the
      22             :  * type mapping it uses. This file is a simple ascii file with the
      23             :  * following format:
      24             :  *  {6DIGIT-VERSION\n[id,type_name\n]*}
      25             :  * The transaction log files have a binary format.
      26             :  */
      27             : 
      28             : #define LOG_START       0
      29             : #define LOG_END         1
      30             : #define LOG_UPDATE_CONST        2
      31             : #define LOG_UPDATE_BULK 3
      32             : #define LOG_UPDATE      4
      33             : #define LOG_CREATE      5
      34             : #define LOG_DESTROY     6
      35             : #define LOG_SEQ         7
      36             : #define LOG_CLEAR       8
      37             : #define LOG_ROW         9 /* per row relative small log entry */
      38             : 
      39             : #ifdef NATIVE_WIN32
      40             : #define getfilepos _ftelli64
      41             : #else
      42             : #ifdef HAVE_FSEEKO
      43             : #define getfilepos ftello
      44             : #else
      45             : #define getfilepos ftell
      46             : #endif
      47             : #endif
      48             : 
      49             : #define BATSIZE 0
      50             : 
      51             : #define LOG_DISABLED(lg) ((lg)->debug&128 || (lg)->inmemory || (lg)->flushnow)
      52             : 
      53             : static const char *log_commands[] = {
      54             :         "LOG_START",
      55             :         "LOG_END",
      56             :         "LOG_UPDATE_CONST",
      57             :         "LOG_UPDATE_BULK",
      58             :         "LOG_UPDATE",
      59             :         "LOG_CREATE",
      60             :         "LOG_DESTROY",
      61             :         "LOG_SEQ",
      62             :         "LOG_CLEAR",
      63             :         "LOG_ROW",
      64             : };
      65             : 
      66             : typedef struct logaction {
      67             :         int type;               /* type of change */
      68             :         lng nr;
      69             :         int tt;
      70             :         lng id;
      71             :         lng offset;
      72             :         log_id cid;             /* id of object */
      73             :         BAT *b;                 /* temporary bat with changes */
      74             :         BAT *uid;               /* temporary bat with bun positions to update */
      75             : } logaction;
      76             : 
      77             : /* during the recover process a number of transactions could be active */
      78             : typedef struct trans {
      79             :         int tid;                /* transaction id */
      80             :         int sz;                 /* sz of the changes array */
      81             :         int nr;                 /* nr of changes */
      82             : 
      83             :         logaction *changes;
      84             : 
      85             :         struct trans *tr;
      86             : } trans;
      87             : 
      88             : typedef struct logformat_t {
      89             :         bte flag;
      90             :         int id;
      91             : } logformat;
      92             : 
      93             : typedef enum {LOG_OK, LOG_EOF, LOG_ERR} log_return;
      94             : 
      95             : static gdk_return bm_commit(logger *lg);
      96             : static gdk_return tr_grow(trans *tr);
      97             : 
      98             : static inline void
      99             : logger_lock(logger *lg)
     100             : {
     101       57858 :         MT_lock_set(&lg->lock);
     102             : }
     103             : 
     104             : static inline void
     105             : logger_unlock(logger *lg)
     106             : {
     107      341546 :         MT_lock_unset(&lg->lock);
     108       40413 : }
     109             : 
     110             : static bte
     111      398753 : find_type(logger *lg, int tpe)
     112             : {
     113      398753 :         BATiter cni = bat_iterator_nolock(lg->type_nr);
     114      398753 :         bte *res = (bte*)Tloc(lg->type_id, 0);
     115             :         BUN p;
     116             : 
     117             :         /* type should be there !*/
     118      398753 :         if (BAThash(lg->type_nr) == GDK_SUCCEED) {
     119      398753 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     120      398753 :                 HASHloop_int(cni, cni.b->thash, p, &tpe) {
     121      398586 :                         MT_rwlock_rdunlock(&cni.b->thashlock);
     122      398586 :                         return res[p];
     123             :                 }
     124         167 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     125             :         }
     126             :         return -1;
     127             : }
     128             : 
     129             : static int
     130      104720 : find_type_nr(logger *lg, bte tpe)
     131             : {
     132      104720 :         BATiter cni = bat_iterator_nolock(lg->type_id);
     133      104720 :         int *res = (int*)Tloc(lg->type_nr, 0);
     134             :         BUN p;
     135             : 
     136             :         /* type should be there !*/
     137      104720 :         if (BAThash(lg->type_id) == GDK_SUCCEED) {
     138      104720 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     139      104720 :                 HASHloop_bte(cni, cni.b->thash, p, &tpe) {
     140      104720 :                         MT_rwlock_rdunlock(&cni.b->thashlock);
     141      104720 :                         return res[p];
     142             :                 }
     143           0 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     144             :         }
     145             :         return -1;
     146             : }
     147             : 
     148             : static BUN
     149       98057 : log_find(BAT *b, BAT *d, int val)
     150             : {
     151             :         BUN p;
     152             : 
     153       98057 :         assert(b->ttype == TYPE_int);
     154       98057 :         assert(d->ttype == TYPE_oid);
     155       98057 :         if (BAThash(b) == GDK_SUCCEED) {
     156       98057 :                 BATiter cni = bat_iterator_nolock(b);
     157       98057 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     158      138265 :                 HASHloop_int(cni, cni.b->thash, p, &val) {
     159       29145 :                         oid pos = p;
     160       29145 :                         if (BUNfnd(d, &pos) == BUN_NONE) {
     161       29145 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     162       29145 :                                 return p;
     163             :                         }
     164             :                 }
     165       68912 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     166             :         } else {                /* unlikely: BAThash failed */
     167             :                 BUN q;
     168           0 :                 int *t = (int *) Tloc(b, 0);
     169             : 
     170           0 :                 for (p = 0, q = BUNlast(b); p < q; p++) {
     171           0 :                         if (t[p] == val) {
     172           0 :                                 oid pos = p;
     173           0 :                                 if (BUNfnd(d, &pos) == BUN_NONE)
     174           0 :                                         return p;
     175             :                         }
     176             :                 }
     177             :         }
     178             :         return BUN_NONE;
     179             : }
     180             : 
     181             : static log_bid
     182      210804 : internal_find_bat(logger *lg, log_id id)
     183             : {
     184      210804 :         BATiter cni = bat_iterator_nolock(lg->catalog_id);
     185             :         BUN p;
     186             : 
     187      210804 :         if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
     188      210804 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     189      331124 :                 HASHloop_int(cni, cni.b->thash, p, &id) {
     190      143120 :                         oid pos = p;
     191      143120 :                         if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
     192      136305 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     193      136305 :                                 return *(log_bid *) Tloc(lg->catalog_bid, p);
     194             :                         }
     195             :                 }
     196       74499 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     197             :         }
     198             :         return 0;
     199             : }
     200             : 
     201             : static void
     202             : logbat_destroy(BAT *b)
     203             : {
     204       18538 :         if (b)
     205      116231 :                 BBPunfix(b->batCacheid);
     206       14337 : }
     207             : 
     208             : static BAT *
     209       18695 : logbat_new(int tt, BUN size, role_t role)
     210             : {
     211       18695 :         BAT *nb = COLnew(0, tt, size, role);
     212             : 
     213       18695 :         if (nb) {
     214       18695 :                 BBP_pid(nb->batCacheid) = 0;
     215       18695 :                 if (role == PERSISTENT)
     216       12663 :                         BATmode(nb, false);
     217             :         } else {
     218           0 :                 TRC_CRITICAL(GDK, "creating new BAT[void:%s]#" BUNFMT " failed\n", ATOMname(tt), size);
     219             :         }
     220       18695 :         return nb;
     221             : }
     222             : 
     223             : static int
     224      150200 : log_read_format(logger *l, logformat *data)
     225             : {
     226      150200 :         assert(!l->inmemory);
     227      289107 :         return mnstr_read(l->input_log, &data->flag, 1, 1) == 1 &&
     228      138907 :                 mnstr_readInt(l->input_log, &data->id) == 1;
     229             : }
     230             : 
     231             : static gdk_return
     232      152142 : log_write_format(logger *lg, logformat *data)
     233             : {
     234      152142 :         assert(data->id || data->flag);
     235      152142 :         assert(!lg->inmemory);
     236      304284 :         if (mnstr_write(lg->output_log, &data->flag, 1, 1) == 1 &&
     237      152142 :             mnstr_writeInt(lg->output_log, data->id))
     238             :                 return GDK_SUCCEED;
     239           0 :         TRC_CRITICAL(GDK, "write failed\n");
     240           0 :         return GDK_FAIL;
     241             : }
     242             : 
     243             : static log_return
     244           0 : log_read_clear(logger *lg, trans *tr, log_id id)
     245             : {
     246           0 :         if (lg->debug & 1)
     247           0 :                 fprintf(stderr, "#logger found log_read_clear %d\n", id);
     248           0 :         if (tr_grow(tr) != GDK_SUCCEED)
     249             :                 return LOG_ERR;
     250           0 :         tr->changes[tr->nr].type = LOG_CLEAR;
     251           0 :         tr->changes[tr->nr].cid = id;
     252           0 :         tr->nr++;
     253           0 :         return LOG_OK;
     254             : }
     255             : 
     256             : static gdk_return
     257           0 : la_bat_clear(logger *lg, logaction *la)
     258             : {
     259           0 :         log_bid bid = internal_find_bat(lg, la->cid);
     260             :         BAT *b;
     261             : 
     262           0 :         if (lg->debug & 1)
     263           0 :                 fprintf(stderr, "#la_bat_clear %d\n", la->cid);
     264             : 
     265           0 :         b = BATdescriptor(bid);
     266           0 :         if (b) {
     267           0 :                 restrict_t access = (restrict_t) b->batRestricted;
     268           0 :                 b->batRestricted = BAT_WRITE;
     269             :                 /* during startup this is fine */
     270           0 :                 BATclear(b, true);
     271           0 :                 b->batRestricted = access;
     272             :                 logbat_destroy(b);
     273             :         }
     274           0 :         return GDK_SUCCEED;
     275             : }
     276             : 
     277             : static log_return
     278        2552 : log_read_seq(logger *lg, logformat *l)
     279             : {
     280        2552 :         int seq = l->id;
     281             :         lng val;
     282             :         BUN p;
     283             : 
     284        2552 :         assert(!lg->inmemory);
     285        2552 :         if (mnstr_readLng(lg->input_log, &val) != 1) {
     286           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     287           0 :                 return LOG_EOF;
     288             :         }
     289             : 
     290        2552 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
     291        2551 :             p >= lg->seqs_id->batInserted) {
     292        1211 :                 assert(lg->seqs_val->hseqbase == 0);
     293        1211 :                 if (BUNreplace(lg->seqs_val, p, &val, false) != GDK_SUCCEED)
     294           0 :                         return LOG_ERR;
     295             :         } else {
     296        1341 :                 if (p != BUN_NONE) {
     297        1340 :                         oid pos = p;
     298        1340 :                         if (BUNappend(lg->dseqs, &pos, false) != GDK_SUCCEED)
     299           0 :                                 return LOG_ERR;
     300             :                 }
     301        2682 :                 if (BUNappend(lg->seqs_id, &seq, false) != GDK_SUCCEED ||
     302        1341 :                     BUNappend(lg->seqs_val, &val, false) != GDK_SUCCEED)
     303           0 :                         return LOG_ERR;
     304             :         }
     305             :         return LOG_OK;
     306             : }
     307             : 
     308             : #if 0
     309             : static gdk_return
     310             : log_write_id(logger *lg, int id)
     311             : {
     312             :         assert(!lg->inmemory);
     313             :         assert(id >= 0);
     314             :         if (mnstr_writeInt(lg->output_log, id))
     315             :                 return GDK_SUCCEED;
     316             :         TRC_CRITICAL(GDK, "write failed\n");
     317             :         return GDK_FAIL;
     318             : }
     319             : 
     320             : static int
     321             : log_read_id(logger *lg, log_id *id)
     322             : {
     323             :         assert(!lg->inmemory);
     324             :         if (mnstr_readInt(lg->input_log, id) != 1) {
     325             :                 TRC_CRITICAL(GDK, "read failed\n");
     326             :                 return LOG_EOF;
     327             :         }
     328             :         return LOG_OK;
     329             : }
     330             : #endif
     331             : 
     332             : static log_return
     333       13018 : string_reader(logger *lg, BAT *b, lng nr)
     334             : {
     335             :         size_t sz = 0;
     336       13018 :         lng SZ = 0;
     337             :         log_return res = LOG_OK;
     338             : 
     339       26086 :         for (; nr && res == LOG_OK; ) {
     340       13068 :                 if (mnstr_readLng(lg->input_log, &SZ) != 1)
     341           0 :                         return LOG_EOF;
     342       13068 :                 sz = (size_t)SZ;
     343       13068 :                 char *buf = lg->buf;
     344       13068 :                 if (lg->bufsize < sz) {
     345           0 :                         if (!(buf = GDKrealloc(lg->buf, sz)))
     346             :                                 return LOG_ERR;
     347           0 :                         lg->buf = buf;
     348           0 :                         lg->bufsize = sz;
     349             :                 }
     350             : 
     351       13068 :                 if (mnstr_read(lg->input_log, buf, sz, 1) != 1)
     352             :                         return LOG_EOF;
     353             :                 /* handle strings */
     354             :                 char *t = buf;
     355             :                 /* chunked */
     356             : #define CHUNK_SIZE 1024
     357             :                 char *strings[CHUNK_SIZE];
     358             :                 int cur = 0;
     359             : 
     360       61125 :                 for(; nr>0 && res == LOG_OK && t < (buf+sz); nr--) {
     361       48057 :                         strings[cur++] = t;
     362       48057 :                         if (cur == CHUNK_SIZE && b && BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED)
     363             :                                 res = LOG_ERR;
     364       48057 :                         if (cur == CHUNK_SIZE)
     365             :                                 cur = 0;
     366             :                         /* find next */
     367     5339185 :                         while(*t)
     368     5291128 :                                 t++;
     369       48057 :                         t++;
     370             :                 }
     371       13068 :                 if (cur && b && BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED)
     372             :                         res = LOG_ERR;
     373             :         }
     374             :         return res;
     375             : }
     376             : 
     377             : static log_return
     378      101703 : log_read_updates(logger *lg, trans *tr, logformat *l, log_id id)
     379             : {
     380             :         log_return res = LOG_OK;
     381             :         lng nr, pnr;
     382      101703 :         bte type_id = -1;
     383             :         int tpe;
     384             : 
     385      101703 :         assert(!lg->inmemory);
     386      101703 :         if (lg->debug & 1)
     387           0 :                 fprintf(stderr, "#logger found log_read_updates %d %s\n", id, l->flag == LOG_UPDATE ? "update" : "update_buld");
     388             : 
     389      203406 :         if (mnstr_readLng(lg->input_log, &nr) != 1 ||
     390      101703 :             mnstr_read(lg->input_log, &type_id, 1, 1) != 1)
     391           0 :                 return LOG_ERR;
     392             : 
     393      101703 :         pnr = nr;
     394      101703 :         tpe = find_type_nr(lg, type_id);
     395      101703 :         if (tpe >= 0) {
     396             :                 BAT *uid = NULL;
     397             :                 BAT *r = NULL;
     398      101703 :                 void *(*rt) (ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead;
     399             :                 lng offset;
     400             : 
     401      101703 :                 assert(nr <= (lng) BUN_MAX);
     402      101703 :                 if (!lg->flushing && l->flag == LOG_UPDATE) {
     403           3 :                         uid = COLnew(0, TYPE_oid, (BUN)nr, PERSISTENT);
     404           3 :                         if (uid == NULL) {
     405           0 :                                 return LOG_ERR;
     406             :                         }
     407             :                 }
     408      101703 :                 if (!lg->flushing) {
     409        7167 :                         r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
     410        7167 :                         if (r == NULL) {
     411           0 :                                 if (uid)
     412           0 :                                         BBPreclaim(uid);
     413           0 :                                 return LOG_ERR;
     414             :                         }
     415             :                 }
     416             : 
     417      101703 :                 if (l->flag == LOG_UPDATE_CONST) {
     418       54693 :                         if (mnstr_readLng(lg->input_log, &offset) != 1) {
     419           0 :                                 if (r)
     420           0 :                                         BBPreclaim(r);
     421           0 :                                 return LOG_ERR;
     422             :                         }
     423       54693 :                         size_t tlen = lg->bufsize;
     424       54693 :                         void *t = rt(lg->buf, &tlen, lg->input_log, 1);
     425       54693 :                         if (t == NULL) {
     426             :                                 res = LOG_ERR;
     427             :                         } else {
     428       54693 :                                 lg->buf = t;
     429       54693 :                                 lg->bufsize = tlen;
     430      128731 :                                 for(BUN p = 0; p<(BUN) nr; p++) {
     431       74038 :                                         if (r && BUNappend(r, t, true) != GDK_SUCCEED)
     432             :                                                 res = LOG_ERR;
     433             :                                 }
     434             :                         }
     435       47010 :                 } else if (l->flag == LOG_UPDATE_BULK) {
     436       46989 :                         if (mnstr_readLng(lg->input_log, &offset) != 1) {
     437           0 :                                 if (r)
     438           0 :                                         BBPreclaim(r);
     439           0 :                                 return LOG_ERR;
     440             :                         }
     441       46989 :                         if (tpe == TYPE_msk) {
     442           0 :                                 if (r) {
     443           0 :                                         if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
     444           0 :                                                 BATsetcount(r, (BUN) nr);
     445             :                                         else
     446             :                                                 res = LOG_ERR;
     447             :                                 } else {
     448           0 :                                         size_t tlen = lg->bufsize/sizeof(int);
     449           0 :                                         size_t cnt = 0, snr = (size_t)nr;
     450           0 :                                         snr = (snr+31)/32;
     451           0 :                                         assert(tlen);
     452           0 :                                         for (; res == LOG_OK && snr > 0; snr-=cnt) {
     453           0 :                                                 cnt = snr>tlen?tlen:snr;
     454           0 :                                                 if (!mnstr_readIntArray(lg->input_log, lg->buf, cnt))
     455             :                                                         res = LOG_ERR;
     456             :                                         }
     457             :                                 }
     458             :                         } else {
     459       46989 :                                 if (!ATOMvarsized(tpe)) {
     460       33423 :                                         size_t cnt = 0, snr = (size_t)nr;
     461       33423 :                                         size_t tlen = lg->bufsize/ATOMsize(tpe), ntlen = lg->bufsize;
     462       33423 :                                         assert(tlen);
     463             :                                         /* read in chunks of max
     464             :                                          * BUFSIZE/width rows */
     465       79511 :                                         for (; res == LOG_OK && snr > 0; snr-=cnt) {
     466       46088 :                                                 cnt = snr>tlen?tlen:snr;
     467       46088 :                                                 void *t = rt(lg->buf, &ntlen, lg->input_log, cnt);
     468             : 
     469       46088 :                                                 if (t == NULL) {
     470             :                                                         res = LOG_EOF;
     471             :                                                         break;
     472             :                                                 }
     473       46088 :                                                 assert(t == lg->buf);
     474       46088 :                                                 if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED)
     475             :                                                         res = LOG_ERR;
     476             :                                         }
     477       13566 :                                 } else if (tpe == TYPE_str) {
     478             :                                         /* efficient string */
     479       13013 :                                         res = string_reader(lg, r, nr);
     480             :                                 } else {
     481        1168 :                                         for (; res == LOG_OK && nr > 0; nr--) {
     482         615 :                                                 size_t tlen = lg->bufsize;
     483         615 :                                                 void *t = rt(lg->buf, &tlen, lg->input_log, 1);
     484             : 
     485         615 :                                                 if (t == NULL) {
     486             :                                                         /* see if failure was due to
     487             :                                                         * malloc or something less
     488             :                                                         * serious (in the current
     489             :                                                         * context) */
     490           0 :                                                         if (strstr(GDKerrbuf, "alloc") == NULL)
     491             :                                                                 res = LOG_EOF;
     492             :                                                         else
     493             :                                                                 res = LOG_ERR;
     494             :                                                 } else {
     495         615 :                                                         lg->buf = t;
     496         615 :                                                         lg->bufsize = tlen;
     497         615 :                                                         if (r && BUNappend(r, t, true) != GDK_SUCCEED)
     498             :                                                                 res = LOG_ERR;
     499             :                                                 }
     500             :                                         }
     501             :                                 }
     502             :                         }
     503             :                 } else {
     504          21 :                         void *(*rh) (ptr, size_t *, stream *, size_t) = BATatoms[TYPE_oid].atomRead;
     505          21 :                         void *hv = ATOMnil(TYPE_oid);
     506          21 :                         offset = 0;
     507             : 
     508          21 :                         if (hv == NULL)
     509             :                                 res = LOG_ERR;
     510          90 :                         for (; res == LOG_OK && nr > 0; nr--) {
     511          69 :                                 size_t hlen = sizeof(oid);
     512          69 :                                 void *h = rh(hv, &hlen, lg->input_log, 1);
     513          69 :                                 assert(hlen == sizeof(oid));
     514          69 :                                 assert(h == hv);
     515          69 :                                 if ((uid && BUNappend(uid, h, true) != GDK_SUCCEED))
     516             :                                         res = LOG_ERR;
     517             :                         }
     518          21 :                         nr = pnr;
     519          21 :                         if (tpe == TYPE_msk) {
     520           0 :                                 if (r) {
     521           0 :                                         if (mnstr_readIntArray(lg->input_log, Tloc(r, 0), (size_t) ((nr + 31) / 32)))
     522           0 :                                                 BATsetcount(r, (BUN) nr);
     523             :                                         else
     524             :                                                 res = LOG_ERR;
     525             :                                 } else {
     526           0 :                                         for (lng i = 0; i < nr; i += 32) {
     527             :                                                 int v;
     528           0 :                                                 switch (mnstr_readInt(lg->input_log, &v)) {
     529           0 :                                                 case 1:
     530           0 :                                                         continue;
     531             :                                                 case 0:
     532             :                                                         res = LOG_EOF;
     533             :                                                         break;
     534           0 :                                                 default:
     535             :                                                         res = LOG_ERR;
     536           0 :                                                         break;
     537             :                                                 }
     538           0 :                                                 break;
     539             :                                         }
     540             :                                 }
     541          21 :                         } else if (tpe == TYPE_str) {
     542             :                                 /* efficient string */
     543           5 :                                 res = string_reader(lg, r, nr);
     544             :                         } else {
     545          74 :                                 for (; res == LOG_OK && nr > 0; nr--) {
     546          58 :                                         size_t tlen = lg->bufsize;
     547          58 :                                         void *t = rt(lg->buf, &tlen, lg->input_log, 1);
     548             : 
     549          58 :                                         if (t == NULL) {
     550           0 :                                                 if (strstr(GDKerrbuf, "malloc") == NULL)
     551             :                                                         res = LOG_EOF;
     552             :                                                 else
     553             :                                                         res = LOG_ERR;
     554             :                                         } else {
     555          58 :                                                 lg->buf = t;
     556          58 :                                                 lg->bufsize = tlen;
     557          58 :                                                 if ((r && BUNappend(r, t, true) != GDK_SUCCEED))
     558             :                                                         res = LOG_ERR;
     559             :                                         }
     560             :                                 }
     561             :                         }
     562          21 :                         GDKfree(hv);
     563             :                 }
     564             : 
     565      101703 :                 if (res == LOG_OK) {
     566      101703 :                         if (tr_grow(tr) == GDK_SUCCEED) {
     567      101703 :                                 tr->changes[tr->nr].type =
     568      101703 :                                         l->flag==LOG_UPDATE_CONST?LOG_UPDATE_BULK:l->flag;
     569      101703 :                                 tr->changes[tr->nr].nr = pnr;
     570      101703 :                                 tr->changes[tr->nr].tt = tpe;
     571      101703 :                                 tr->changes[tr->nr].cid = id;
     572      101703 :                                 tr->changes[tr->nr].offset = offset;
     573      101703 :                                 tr->changes[tr->nr].b = r;
     574      101703 :                                 tr->changes[tr->nr].uid = uid;
     575      101703 :                                 tr->nr++;
     576             :                         } else {
     577             :                                 res = LOG_ERR;
     578             :                         }
     579             :                 }
     580      101703 :                 if (res == LOG_ERR) {
     581           0 :                         if (r)
     582           0 :                                 BBPreclaim(r);
     583           0 :                         if (uid)
     584           0 :                                 BBPreclaim(uid);
     585             :                 }
     586             :         } else {
     587             :                 /* bat missing ERROR or ignore ? currently error. */
     588             :                 res = LOG_ERR;
     589             :         }
     590             :         return res;
     591             : }
     592             : 
     593             : 
     594             : static gdk_return
     595      256536 : la_bat_update_count(logger *lg, log_id id, lng cnt)
     596             : {
     597      256536 :         BATiter cni = bat_iterator_nolock(lg->catalog_id);
     598             :         BUN p;
     599             : 
     600      256536 :         if (BAThash(lg->catalog_id) == GDK_SUCCEED) {
     601      256536 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     602      639431 :                 HASHloop_int(cni, cni.b->thash, p, &id) {
     603      256960 :                         lng ocnt = *(lng*) Tloc(lg->catalog_cnt, p);
     604      256960 :                         assert(lg->catalog_cnt->hseqbase == 0);
     605      256960 :                         if (ocnt < cnt && BUNreplace(lg->catalog_cnt, p, &cnt, false) != GDK_SUCCEED) {
     606           0 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     607           0 :                                 return GDK_FAIL;
     608             :                         }
     609             :                 }
     610      256536 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     611             :         }
     612             :         return GDK_SUCCEED;
     613             : 
     614             : }
     615             : 
     616             : static gdk_return
     617      101703 : la_bat_updates(logger *lg, logaction *la)
     618             : {
     619      101703 :         log_bid bid = internal_find_bat(lg, la->cid);
     620             :         BAT *b = NULL;
     621             : 
     622      101703 :         if (bid == 0)
     623             :                 return GDK_SUCCEED; /* ignore bats no longer in the catalog */
     624             : 
     625       95101 :         if (!lg->flushing) {
     626        7167 :                 b = BATdescriptor(bid);
     627        7167 :                 if (b == NULL)
     628             :                         return GDK_FAIL;
     629             :         }
     630       95101 :         if (la->type == LOG_UPDATE_BULK) {
     631             :                 BUN cnt = 0;
     632             : 
     633       95088 :                 if (!lg->flushing) {
     634        7164 :                         cnt = BATcount(b);
     635        7164 :                         int is_msk = (b->ttype == TYPE_msk);
     636             :                         /* handle offset 0 ie clear */
     637             :                         if (/* DISABLES CODE */ (0) && la->offset == 0 && cnt)
     638             :                                 BATclear(b, true);
     639             :                         /* handle offset */
     640        7164 :                         if (cnt <= (BUN)la->offset) {
     641        2351 :                                 msk t = 1;
     642        2351 :                                 if (cnt < (BUN)la->offset) { /* insert nils */
     643         113 :                                         const void *tv = (is_msk)?&t:ATOMnilptr(b->ttype);
     644         113 :                                         lng i, d = la->offset - BATcount(b);
     645         226 :                                         for(i=0;i<d;i++) {
     646         113 :                                                 if (BUNappend(b, tv, true) != GDK_SUCCEED) {
     647             :                                                         logbat_destroy(b);
     648           0 :                                                         return GDK_FAIL;
     649             :                                                 }
     650             :                                         }
     651             :                                 }
     652        2351 :                                 if (BATcount(b) == (BUN)la->offset && BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
     653             :                                         logbat_destroy(b);
     654           0 :                                         return GDK_FAIL;
     655             :                                 }
     656             :                         } else {
     657        4813 :                                 BATiter vi = bat_iterator(la->b);
     658             :                                 BUN p, q;
     659             : 
     660       14701 :                                 for (p=0, q = (BUN)la->offset; p<(BUN)la->nr; p++, q++) {
     661        9888 :                                         const void *t = BUNtail(vi, p);
     662             : 
     663        9888 :                                         if (q < cnt) {
     664        9888 :                                                 if (BUNreplace(b, q, t, true) != GDK_SUCCEED) {
     665             :                                                         logbat_destroy(b);
     666           0 :                                                         bat_iterator_end(&vi);
     667           0 :                                                         return GDK_FAIL;
     668             :                                                 }
     669             :                                         } else {
     670           0 :                                                 if (BUNappend(b, t, true) != GDK_SUCCEED) {
     671             :                                                         logbat_destroy(b);
     672           0 :                                                         bat_iterator_end(&vi);
     673           0 :                                                         return GDK_FAIL;
     674             :                                                 }
     675             :                                         }
     676             :                                 }
     677        4813 :                                 bat_iterator_end(&vi);
     678             :                         }
     679             :                 }
     680       95088 :                 cnt = (BUN)(la->offset + la->nr);
     681       95088 :                 if (la_bat_update_count(lg, la->cid, cnt) != GDK_SUCCEED) {
     682           0 :                         if (b)
     683             :                                 logbat_destroy(b);
     684           0 :                         return GDK_FAIL;
     685             :                 }
     686          13 :         } else if (!lg->flushing && la->type == LOG_UPDATE) {
     687           3 :                 BATiter vi = bat_iterator(la->b);
     688             :                 BUN p, q;
     689             : 
     690          12 :                 BATloop(la->b, p, q) {
     691           9 :                         oid h = BUNtoid(la->uid, p);
     692           9 :                         const void *t = BUNtail(vi, p);
     693             : 
     694           9 :                         if (BUNreplace(b, h, t, true) != GDK_SUCCEED) {
     695             :                                 logbat_destroy(b);
     696           0 :                                 bat_iterator_end(&vi);
     697           0 :                                 return GDK_FAIL;
     698             :                         }
     699             :                 }
     700           3 :                 bat_iterator_end(&vi);
     701             :         }
     702       95101 :         if (b)
     703             :                 logbat_destroy(b);
     704             :         return GDK_SUCCEED;
     705             : }
     706             : 
     707             : static log_return
     708        9597 : log_read_destroy(logger *lg, trans *tr, log_id id)
     709             : {
     710             :         (void) lg;
     711        9597 :         assert(!lg->inmemory);
     712        9597 :         if (tr_grow(tr) == GDK_SUCCEED) {
     713        9597 :                 tr->changes[tr->nr].type = LOG_DESTROY;
     714        9597 :                 tr->changes[tr->nr].cid = id;
     715        9597 :                 tr->nr++;
     716             :         }
     717        9597 :         return LOG_OK;
     718             : }
     719             : 
     720             : static gdk_return
     721          36 : la_bat_destroy(logger *lg, logaction *la)
     722             : {
     723          36 :         log_bid bid = internal_find_bat(lg, la->cid);
     724             : 
     725          36 :         if (bid && logger_del_bat(lg, bid) != GDK_SUCCEED)
     726           0 :                 return GDK_FAIL;
     727             :         return GDK_SUCCEED;
     728             : }
     729             : 
     730             : static log_return
     731        3017 : log_read_create(logger *lg, trans *tr, log_id id)
     732             : {
     733             :         bte tt;
     734             :         int tpe;
     735             : 
     736        3017 :         assert(!lg->inmemory);
     737        3017 :         if (lg->debug & 1)
     738           0 :                 fprintf(stderr, "#log_read_create %d\n", id);
     739             : 
     740        3017 :         if (mnstr_read(lg->input_log, &tt, 1, 1) != 1)
     741             :                 return LOG_ERR;
     742             : 
     743        3017 :         tpe = find_type_nr(lg, tt);
     744             :         /* read create */
     745        3017 :         if (tr_grow(tr) == GDK_SUCCEED) {
     746        3017 :                 tr->changes[tr->nr].type = LOG_CREATE;
     747        3017 :                 tr->changes[tr->nr].tt = tpe;
     748        3017 :                 tr->changes[tr->nr].cid = id;
     749        3017 :                 tr->nr++;
     750             :         }
     751             : 
     752             :         return LOG_OK;
     753             : }
     754             : 
     755             : static gdk_return
     756         232 : la_bat_create(logger *lg, logaction *la)
     757             : {
     758             :         BAT *b;
     759             : 
     760             :         /* formerly head column type, should be void */
     761         232 :         if ((b = COLnew(0, la->tt, BATSIZE, PERSISTENT)) == NULL)
     762             :                 return GDK_FAIL;
     763             : 
     764         232 :         if (la->tt < 0)
     765           0 :                 BATtseqbase(b, 0);
     766             : 
     767         464 :         if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
     768         232 :             logger_add_bat(lg, b, la->cid) != GDK_SUCCEED) {
     769             :                 logbat_destroy(b);
     770           0 :                 return GDK_FAIL;
     771             :         }
     772             :         logbat_destroy(b);
     773         232 :         return GDK_SUCCEED;
     774             : }
     775             : 
     776             : static gdk_return
     777         194 : logger_write_new_types(logger *lg, FILE *fp)
     778             : {
     779         194 :         bte id = 0;
     780             : 
     781             :         /* write types and insert into bats */
     782             :         /* first the fixed sized types */
     783        6205 :         for (int i=0;i<GDKatomcnt; i++) {
     784        6011 :                 if (ATOMvarsized(i))
     785        1550 :                         continue;
     786        8922 :                 if (BUNappend(lg->type_id, &id, false) != GDK_SUCCEED ||
     787        8922 :                     BUNappend(lg->type_nme, BATatoms[i].name, false) != GDK_SUCCEED ||
     788        8922 :                     BUNappend(lg->type_nr, &i, false) != GDK_SUCCEED ||
     789        4461 :                     fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0) {
     790           0 :                         return GDK_FAIL;
     791             :                 }
     792        4461 :                 id++;
     793             :         }
     794             :         /* second the var sized types */
     795         194 :         id=-127; /* start after nil */
     796        6205 :         for (int i=0;i<GDKatomcnt; i++) {
     797        6011 :                 if (!ATOMvarsized(i))
     798        4461 :                         continue;
     799        3100 :                 if (BUNappend(lg->type_id, &id, false) != GDK_SUCCEED ||
     800        3100 :                     BUNappend(lg->type_nme, BATatoms[i].name, false) != GDK_SUCCEED ||
     801        3100 :                     BUNappend(lg->type_nr, &i, false) != GDK_SUCCEED ||
     802        1550 :                     fprintf(fp, "%d,%s\n", id, BATatoms[i].name) < 0) {
     803           0 :                         return GDK_FAIL;
     804             :                 }
     805        1550 :                 id++;
     806             :         }
     807         194 :         return GDK_SUCCEED;
     808             : }
     809             : 
     810             : #define TR_SIZE         1024
     811             : 
     812             : static trans *
     813       11019 : tr_create(trans *tr, int tid)
     814             : {
     815       11019 :         trans *ntr = GDKmalloc(sizeof(trans));
     816             : 
     817       11019 :         if (ntr == NULL)
     818             :                 return NULL;
     819       11019 :         ntr->tid = tid;
     820       11019 :         ntr->sz = TR_SIZE;
     821       11019 :         ntr->nr = 0;
     822       11019 :         ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
     823       11019 :         if (ntr->changes == NULL) {
     824           0 :                 GDKfree(ntr);
     825           0 :                 return NULL;
     826             :         }
     827       11019 :         ntr->tr = tr;
     828       11019 :         return ntr;
     829             : }
     830             : 
     831             : static gdk_return
     832      114317 : la_apply(logger *lg, logaction *c)
     833             : {
     834             :         gdk_return ret = GDK_SUCCEED;
     835             : 
     836      114317 :         switch (c->type) {
     837      101703 :         case LOG_UPDATE_BULK:
     838             :         case LOG_UPDATE:
     839      101703 :                 ret = la_bat_updates(lg, c);
     840      101703 :                 break;
     841        3017 :         case LOG_CREATE:
     842        3017 :                 if (!lg->flushing)
     843         232 :                         ret = la_bat_create(lg, c);
     844             :                 break;
     845        9597 :         case LOG_DESTROY:
     846        9597 :                 if (!lg->flushing)
     847          36 :                         ret = la_bat_destroy(lg, c);
     848             :                 break;
     849           0 :         case LOG_CLEAR:
     850           0 :                 if (!lg->flushing)
     851           0 :                         ret = la_bat_clear(lg, c);
     852             :                 break;
     853             :         default:
     854           0 :                 assert(0);
     855             :         }
     856      114317 :         return ret;
     857             : }
     858             : 
     859             : static void
     860      114317 : la_destroy(logaction *c)
     861             : {
     862      114317 :         if (c->b && (c->type == LOG_UPDATE || c->type == LOG_UPDATE_BULK))
     863             :                 logbat_destroy(c->b);
     864      114317 :         if (c->uid && c->type == LOG_UPDATE)
     865             :                 logbat_destroy(c->uid);
     866      114317 : }
     867             : 
     868             : static gdk_return
     869      114317 : tr_grow(trans *tr)
     870             : {
     871      114317 :         if (tr->nr == tr->sz) {
     872             :                 logaction *changes;
     873           9 :                 tr->sz <<= 1;
     874           9 :                 changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
     875           9 :                 if (changes == NULL)
     876             :                         return GDK_FAIL;
     877           9 :                 tr->changes = changes;
     878             :         }
     879             :         /* cleanup the next */
     880      114317 :         tr->changes[tr->nr].b = NULL;
     881      114317 :         return GDK_SUCCEED;
     882             : }
     883             : 
     884             : static trans *
     885       11019 : tr_destroy(trans *tr)
     886             : {
     887       11019 :         trans *r = tr->tr;
     888             : 
     889       11019 :         GDKfree(tr->changes);
     890       11019 :         GDKfree(tr);
     891       11019 :         return r;
     892             : }
     893             : 
     894             : static trans *
     895           0 : tr_abort_(logger *lg, trans *tr, int s)
     896             : {
     897             :         int i;
     898             : 
     899           0 :         if (lg->debug & 1)
     900           0 :                 fprintf(stderr, "#tr_abort\n");
     901             : 
     902           0 :         for (i = s; i < tr->nr; i++)
     903           0 :                 la_destroy(&tr->changes[i]);
     904           0 :         return tr_destroy(tr);
     905             : }
     906             : 
     907             : static trans *
     908             : tr_abort(logger *lg, trans *tr)
     909             : {
     910           0 :         return tr_abort_(lg, tr, 0);
     911             : }
     912             : 
     913             : static trans *
     914       11019 : tr_commit(logger *lg, trans *tr)
     915             : {
     916             :         int i;
     917             : 
     918       11019 :         if (lg->debug & 1)
     919           0 :                 fprintf(stderr, "#tr_commit\n");
     920             : 
     921      125336 :         for (i = 0; i < tr->nr; i++) {
     922      114317 :                 if (la_apply(lg, &tr->changes[i]) != GDK_SUCCEED) {
     923             :                         do {
     924           0 :                                 tr = tr_abort_(lg, tr, i);
     925           0 :                         } while (tr != NULL);
     926             :                         return (trans *) -1;
     927             :                 }
     928      114317 :                 la_destroy(&tr->changes[i]);
     929             :         }
     930       11019 :         lg->saved_tid = tr->tid;
     931       11019 :         return tr_destroy(tr);
     932             : }
     933             : 
     934             : static gdk_return
     935          71 : logger_read_types_file(logger *lg, FILE *fp)
     936             : {
     937          71 :         int id = 0;
     938             :         char atom_name[IDLENGTH];
     939             : 
     940             :         /* scanf should use IDLENGTH somehow */
     941        2234 :         while(fscanf(fp, "%d,%63s\n", &id, atom_name) == 2) {
     942        2163 :                 int i = ATOMindex(atom_name);
     943             : 
     944        2163 :                 if (id > 255 || i < 0) {
     945           0 :                         GDKerror("unknown type in log file '%s'\n", atom_name);
     946           0 :                         return GDK_FAIL;
     947             :                 }
     948        2163 :                 bte lid = (bte)id;
     949        4326 :                 if (BUNappend(lg->type_id, &lid, false) != GDK_SUCCEED ||
     950        4326 :                     BUNappend(lg->type_nme, atom_name, false) != GDK_SUCCEED ||
     951        2163 :                     BUNappend(lg->type_nr, &i, false) != GDK_SUCCEED) {
     952           0 :                         return GDK_FAIL;
     953             :                 }
     954             :         }
     955             :         return GDK_SUCCEED;
     956             : }
     957             : 
     958             : 
     959             : gdk_return
     960         194 : logger_create_types_file(logger *lg, const char *filename)
     961             : {
     962             :         FILE *fp;
     963             : 
     964         194 :         if ((fp = MT_fopen(filename, "w")) == NULL) {
     965           0 :                 GDKerror("cannot create log file %s\n", filename);
     966           0 :                 return GDK_FAIL;
     967             :         }
     968         194 :         if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
     969           0 :                 fclose(fp);
     970             :                 MT_remove(filename);
     971           0 :                 GDKerror("writing log file %s failed", filename);
     972           0 :                 return GDK_FAIL;
     973             :         }
     974             : 
     975         194 :         if (logger_write_new_types(lg, fp) != GDK_SUCCEED) {
     976           0 :                 fclose(fp);
     977             :                 MT_remove(filename);
     978           0 :                 GDKerror("writing log file %s failed", filename);
     979           0 :                 return GDK_FAIL;
     980             :         }
     981         194 :         if (fflush(fp) < 0 || (!(GDKdebug & NOSYNCMASK)
     982             : #if defined(_MSC_VER)
     983             :                      && _commit(_fileno(fp)) < 0
     984             : #elif defined(HAVE_FDATASYNC)
     985           0 :                      && fdatasync(fileno(fp)) < 0
     986             : #elif defined(HAVE_FSYNC)
     987             :                      && fsync(fileno(fp)) < 0
     988             : #endif
     989             :             )) {
     990           0 :                 GDKsyserror("flushing log file %s failed", filename);
     991           0 :                 fclose(fp);
     992             :                 MT_remove(filename);
     993             :                 return GDK_FAIL;
     994             :         }
     995         194 :         if (fclose(fp) < 0) {
     996           0 :                 GDKsyserror("closing log file %s failed", filename);
     997             :                 MT_remove(filename);
     998             :                 return GDK_FAIL;
     999             :         }
    1000             :         return GDK_SUCCEED;
    1001             : }
    1002             : 
    1003             : static gdk_return
    1004       12571 : logger_open_output(logger *lg)
    1005             : {
    1006       12571 :         logged_range *new_range = (logged_range*)GDKmalloc(sizeof(logged_range));
    1007             : 
    1008       12571 :         if (!new_range) {
    1009           0 :                 TRC_CRITICAL(GDK, "allocation failure\n");
    1010           0 :                 return GDK_FAIL;
    1011             :         }
    1012             : 
    1013       12571 :         lg->end = 0;
    1014       12571 :         if (!LOG_DISABLED(lg)) {
    1015             :                 char id[32];
    1016             :                 char *filename;
    1017             : 
    1018       12570 :                 if (snprintf(id, sizeof(id), LLFMT, lg->id) >= (int) sizeof(id)) {
    1019           0 :                         TRC_CRITICAL(GDK, "filename is too large\n");
    1020           0 :                         GDKfree(new_range);
    1021           0 :                         return GDK_FAIL;
    1022             :                 }
    1023       12570 :                 if (!(filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id))) {
    1024           0 :                         TRC_CRITICAL(GDK, "allocation failure\n");
    1025           0 :                         GDKfree(new_range);
    1026           0 :                         return GDK_FAIL;
    1027             :                 }
    1028             : 
    1029       12570 :                 lg->output_log = open_wstream(filename);
    1030       12570 :                 if (lg->output_log) {
    1031       12570 :                         short byteorder = 1234;
    1032       12570 :                         mnstr_write(lg->output_log, &byteorder, sizeof(byteorder), 1);
    1033             :                 }
    1034       12570 :                 lg->end = 0;
    1035             : 
    1036       12570 :                 if (lg->output_log == NULL || mnstr_errnr(lg->output_log)) {
    1037           0 :                         TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL));
    1038           0 :                         GDKfree(new_range);
    1039           0 :                         GDKfree(filename);
    1040           0 :                         return GDK_FAIL;
    1041             :                 }
    1042       12570 :                 GDKfree(filename);
    1043             :         }
    1044       12571 :         new_range->id = lg->id;
    1045       12571 :         new_range->first_tid = lg->tid;
    1046       12571 :         new_range->last_tid = lg->tid;
    1047       12571 :         new_range->last_ts = 0;
    1048       12571 :         new_range->next = NULL;
    1049       12571 :         if (lg->current)
    1050       12305 :                 lg->current->next = new_range;
    1051       12571 :         lg->current = new_range;
    1052       12571 :         if (!lg->pending)
    1053         266 :                 lg->pending = new_range;
    1054             :         return GDK_SUCCEED;
    1055             : }
    1056             : 
    1057             : static inline void
    1058       11643 : logger_close_input(logger *lg)
    1059             : {
    1060       11643 :         if (!lg->inmemory)
    1061       11642 :                 close_stream(lg->input_log);
    1062       11643 :         lg->input_log = NULL;
    1063       11643 : }
    1064             : 
    1065             : static inline void
    1066       12570 : logger_close_output(logger *lg)
    1067             : {
    1068       12570 :         if (!LOG_DISABLED(lg))
    1069       12569 :                 close_stream(lg->output_log);
    1070       12570 :         lg->output_log = NULL;
    1071       12570 : }
    1072             : 
    1073             : static gdk_return
    1074       11378 : logger_open_input(logger *lg, char *filename, bool *filemissing)
    1075             : {
    1076       11378 :         lg->input_log = open_rstream(filename);
    1077             : 
    1078             :         /* if the file doesn't exist, there is nothing to be read back */
    1079       11378 :         if (lg->input_log == NULL || mnstr_errnr(lg->input_log)) {
    1080          79 :                 logger_close_input(lg);
    1081          79 :                 *filemissing = true;
    1082          79 :                 return GDK_SUCCEED;
    1083             :         }
    1084             :         short byteorder;
    1085       11299 :         switch (mnstr_read(lg->input_log, &byteorder, sizeof(byteorder), 1)) {
    1086           0 :         case -1:
    1087           0 :                 logger_close_input(lg);
    1088           0 :                 return GDK_FAIL;
    1089           6 :         case 0:
    1090             :                 /* empty file is ok */
    1091           6 :                 logger_close_input(lg);
    1092           6 :                 return GDK_SUCCEED;
    1093       11293 :         case 1:
    1094             :                 /* if not empty, must start with correct byte order mark */
    1095       11293 :                 if (byteorder != 1234) {
    1096           0 :                         TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
    1097           0 :                         logger_close_input(lg);
    1098           0 :                         return GDK_FAIL;
    1099             :                 }
    1100             :                 break;
    1101             :         }
    1102             :         return GDK_SUCCEED;
    1103             : }
    1104             : 
    1105             : static log_return
    1106       11293 : logger_read_transaction(logger *lg)
    1107             : {
    1108             :         logformat l;
    1109             :         trans *tr = NULL;
    1110             :         log_return err = LOG_OK;
    1111             :         int ok = 1;
    1112       11293 :         int dbg = GDKdebug;
    1113             : 
    1114       11293 :         if (!lg->flushing)
    1115          86 :                 GDKdebug &= ~(CHECKMASK|PROPMASK);
    1116             : 
    1117      150200 :         while (err == LOG_OK && (ok=log_read_format(lg, &l))) {
    1118      138907 :                 if (l.flag == 0 && l.id == 0) {
    1119             :                         err = LOG_EOF;
    1120             :                         break;
    1121             :                 }
    1122             : 
    1123      138907 :                 if (lg->debug & 1) {
    1124           0 :                         fprintf(stderr, "#logger_readlog: ");
    1125           0 :                         if (l.flag > 0 &&
    1126             :                             l.flag < (bte) (sizeof(log_commands) / sizeof(log_commands[0])))
    1127           0 :                                 fprintf(stderr, "%s", log_commands[(int) l.flag]);
    1128             :                         else
    1129           0 :                                 fprintf(stderr, "%d", l.flag);
    1130           0 :                         fprintf(stderr, " %d\n", l.id);
    1131             :                 }
    1132             :                 /* the functions we call here can succeed (LOG_OK),
    1133             :                  * but they can also fail for two different reasons:
    1134             :                  * they can run out of input (LOG_EOF -- this is not
    1135             :                  * serious, we just abort the remaining transactions),
    1136             :                  * or some malloc or BAT update fails (LOG_ERR -- this
    1137             :                  * is serious, we must abort the complete process);
    1138             :                  * the latter failure causes the current function to
    1139             :                  * return GDK_FAIL */
    1140      138907 :                 switch (l.flag) {
    1141       11019 :                 case LOG_START:
    1142       11019 :                         if (l.id > lg->tid)
    1143          59 :                                 lg->tid = l.id;
    1144       11019 :                         if ((tr = tr_create(tr, l.id)) == NULL) {
    1145             :                                 err = LOG_ERR;
    1146             :                                 break;
    1147             :                         }
    1148       11019 :                         if (lg->debug & 1)
    1149           0 :                                 fprintf(stderr, "#logger tstart %d\n", tr->tid);
    1150             :                         break;
    1151       11019 :                 case LOG_END:
    1152       11019 :                         if (tr == NULL)
    1153             :                                 err = LOG_EOF;
    1154       11019 :                         else if (tr->tid != l.id)    /* abort record */
    1155             :                                 tr = tr_abort(lg, tr);
    1156             :                         else
    1157       11019 :                                 tr = tr_commit(lg, tr);
    1158             :                         break;
    1159        2552 :                 case LOG_SEQ:
    1160        2552 :                         err = log_read_seq(lg, &l);
    1161        2552 :                         break;
    1162      101703 :                 case LOG_UPDATE_CONST:
    1163             :                 case LOG_UPDATE_BULK:
    1164             :                 case LOG_UPDATE:
    1165      101703 :                         if (tr == NULL)
    1166             :                                 err = LOG_EOF;
    1167             :                         else
    1168      101703 :                                 err = log_read_updates(lg, tr, &l, l.id);
    1169             :                         break;
    1170        3017 :                 case LOG_CREATE:
    1171        3017 :                         if (tr == NULL)
    1172             :                                 err = LOG_EOF;
    1173             :                         else
    1174        3017 :                                 err = log_read_create(lg, tr, l.id);
    1175             :                         break;
    1176        9597 :                 case LOG_DESTROY:
    1177        9597 :                         if (tr == NULL)
    1178             :                                 err = LOG_EOF;
    1179             :                         else
    1180        9597 :                                 err = log_read_destroy(lg, tr, l.id);
    1181             :                         break;
    1182           0 :                 case LOG_CLEAR:
    1183           0 :                         if (tr == NULL)
    1184             :                                 err = LOG_EOF;
    1185             :                         else
    1186           0 :                                 err = log_read_clear(lg, tr, l.id);
    1187             :                         break;
    1188             :                 default:
    1189             :                         err = LOG_ERR;
    1190             :                 }
    1191      138907 :                 if (tr == (trans *) -1) {
    1192             :                         err = LOG_ERR;
    1193             :                         tr = NULL;
    1194             :                         break;
    1195             :                 }
    1196             :         }
    1197       11293 :         while (tr)
    1198             :                 tr = tr_abort(lg, tr);
    1199       11293 :         if (!lg->flushing)
    1200          86 :                 GDKdebug = dbg;
    1201       11293 :         if (!ok)
    1202       11293 :                 return LOG_EOF;
    1203             :         return err;
    1204             : }
    1205             : 
    1206             : static gdk_return
    1207         171 : logger_readlog(logger *lg, char *filename, bool *filemissing)
    1208             : {
    1209             :         log_return err = LOG_OK;
    1210             :         time_t t0, t1;
    1211             :         struct stat sb;
    1212             : 
    1213         171 :         assert(!lg->inmemory);
    1214             : 
    1215         171 :         if (lg->debug & 1) {
    1216           0 :                 fprintf(stderr, "#logger_readlog opening %s\n", filename);
    1217             :         }
    1218             : 
    1219         171 :         gdk_return res = logger_open_input(lg, filename, filemissing);
    1220         171 :         if (!lg->input_log)
    1221             :                 return res;
    1222             :         int fd;
    1223         172 :         if ((fd = getFileNo(lg->input_log)) < 0 || fstat(fd, &sb) < 0) {
    1224           0 :                 if (lg->debug & 1) {
    1225           0 :                         fprintf(stderr, "!ERROR: logger_readlog: fstat on opened file %s failed\n", filename);
    1226             :                 }
    1227           0 :                 logger_close_input(lg);
    1228             :                 /* If the file could be opened, but fstat fails,
    1229             :                  * something weird is going on */
    1230           0 :                 return GDK_FAIL;
    1231             :         }
    1232          86 :         t0 = time(NULL);
    1233          86 :         if (lg->debug & 1) {
    1234           0 :                 printf("# Start reading the write-ahead log '%s'\n", filename);
    1235           0 :                 fflush(stdout);
    1236             :         }
    1237         172 :         while (err != LOG_EOF && err != LOG_ERR) {
    1238          86 :                 t1 = time(NULL);
    1239          86 :                 if (t1 - t0 > 10) {
    1240             :                         lng fpos;
    1241             :                         t0 = t1;
    1242             :                         /* not more than once every 10 seconds */
    1243           0 :                         fpos = (lng) getfilepos(getFile(lg->input_log));
    1244           0 :                         if (lg->debug & 1 && fpos >= 0) {
    1245           0 :                                 printf("# still reading write-ahead log \"%s\" (%d%% done)\n", filename, (int) ((fpos * 100 + 50) / sb.st_size));
    1246           0 :                                 fflush(stdout);
    1247             :                         }
    1248             :                 }
    1249          86 :                 err = logger_read_transaction(lg);
    1250             :         }
    1251          86 :         logger_close_input(lg);
    1252          86 :         lg->input_log = NULL;
    1253             : 
    1254             :         /* remaining transactions are not committed, ie abort */
    1255          86 :         if (lg->debug & 1) {
    1256           0 :                 printf("# Finished reading the write-ahead log '%s'\n", filename);
    1257           0 :                 fflush(stdout);
    1258             :         }
    1259             :         /* we cannot distinguish errors from incomplete transactions
    1260             :          * (even if we would log aborts in the logs). So we simply
    1261             :          * abort and move to the next log file */
    1262             :         //return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    1263             :         return GDK_SUCCEED;
    1264             : }
    1265             : 
    1266             : /*
    1267             :  * The log files are incrementally numbered, starting from 2. They are
    1268             :  * processed in the same sequence.
    1269             :  */
    1270             : static gdk_return
    1271          79 : logger_readlogs(logger *lg, char *filename)
    1272             : {
    1273             :         gdk_return res = GDK_SUCCEED;
    1274             : 
    1275          79 :         assert(!lg->inmemory);
    1276          79 :         if (lg->debug & 1)
    1277           0 :                 fprintf(stderr, "#logger_readlogs logger id is " LLFMT " last logger id is " LLFMT "\n", lg->id, lg->saved_id);
    1278             : 
    1279             :         char log_filename[FILENAME_MAX];
    1280          79 :         if (lg->saved_id >= lg->id) {
    1281          79 :                 bool filemissing = false;
    1282             : 
    1283          79 :                 lg->id = lg->saved_id+1;
    1284         250 :                 while (res == GDK_SUCCEED && !filemissing) {
    1285         171 :                         if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
    1286           0 :                                 GDKerror("Logger filename path is too large\n");
    1287           0 :                                 return GDK_FAIL;
    1288             :                         }
    1289         171 :                         res = logger_readlog(lg, log_filename, &filemissing);
    1290         171 :                         if (!filemissing) {
    1291          92 :                                 lg->saved_id++;
    1292          92 :                                 lg->id++;
    1293             :                         }
    1294             :                 }
    1295             :         }
    1296             :         return res;
    1297             : }
    1298             : 
    1299             : static gdk_return
    1300       15413 : logger_commit(logger *lg)
    1301             : {
    1302       15413 :         if (lg->debug & 1)
    1303           0 :                 fprintf(stderr, "#logger_commit\n");
    1304             : 
    1305       15413 :         return bm_commit(lg);
    1306             : }
    1307             : 
    1308             : static gdk_return
    1309          79 : check_version(logger *lg, FILE *fp, const char *fn, const char *logdir, const char *filename)
    1310             : {
    1311          79 :         int version = 0;
    1312             : 
    1313          79 :         assert(!lg->inmemory);
    1314          79 :         if (fscanf(fp, "%6d", &version) != 1) {
    1315           0 :                 GDKerror("Could not read the version number from the file '%s/log'.\n", lg->dir);
    1316           0 :                 fclose(fp);
    1317           0 :                 return GDK_FAIL;
    1318             :         }
    1319          79 :         if (version < 52300) {       /* first CATALOG_VERSION for "new" log format */
    1320           8 :                 lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1321           8 :                 lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1322           8 :                 lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
    1323           8 :                 if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
    1324           0 :                         GDKerror("cannot create catalog bats");
    1325           0 :                         fclose(fp);
    1326           0 :                         return GDK_FAIL;
    1327             :                 }
    1328             :                 /* old_logger_load always closes fp */
    1329           8 :                 if (old_logger_load(lg, fn, logdir, fp, version, filename) != GDK_SUCCEED) {
    1330             :                         //loads drop no longer needed catalog, snapshots bats
    1331             :                         //convert catalog_oid -> catalog_id (lng->int)
    1332           0 :                         GDKerror("Incompatible database version %06d, "
    1333             :                                  "this server supports version %06d.\n%s",
    1334             :                                  version, lg->version,
    1335             :                                  version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
    1336           0 :                         return GDK_FAIL;
    1337             :                 }
    1338             :                 return GDK_SUCCEED;
    1339          71 :         } else if (version != lg->version) {
    1340           0 :                 if (lg->prefuncp == NULL ||
    1341           0 :                     (*lg->prefuncp)(lg->funcdata, version, lg->version) != GDK_SUCCEED) {
    1342           0 :                         GDKerror("Incompatible database version %06d, "
    1343             :                                  "this server supports version %06d.\n%s",
    1344             :                                  version, lg->version,
    1345             :                                  version < lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
    1346           0 :                         fclose(fp);
    1347           0 :                         return GDK_FAIL;
    1348             :                 }
    1349             :         } else {
    1350          71 :                 lg->postfuncp = NULL;         /* don't call */
    1351             :         }
    1352         142 :         if (fgetc(fp) != '\n' ||         /* skip \n */
    1353          71 :             fgetc(fp) != '\n') {         /* skip \n */
    1354           0 :                 GDKerror("Badly formatted log file");
    1355           0 :                 fclose(fp);
    1356           0 :                 return GDK_FAIL;
    1357             :         }
    1358          71 :         if (logger_read_types_file(lg, fp) != GDK_SUCCEED) {
    1359           0 :                 fclose(fp);
    1360           0 :                 return GDK_FAIL;
    1361             :         }
    1362          71 :         fclose(fp);
    1363          71 :         return GDK_SUCCEED;
    1364             : }
    1365             : 
    1366             : static BAT *
    1367        2232 : bm_tids(BAT *b, BAT *d)
    1368             : {
    1369        2232 :         BUN sz = BATcount(b);
    1370        2232 :         BAT *tids = BATdense(0, 0, sz);
    1371             : 
    1372        2232 :         if (tids == NULL)
    1373             :                 return NULL;
    1374             : 
    1375        2232 :         if (BATcount(d)) {
    1376        2232 :                 BAT *diff = BATdiff(tids, d, NULL, NULL, false, false, BUN_NONE);
    1377             :                 logbat_destroy(tids);
    1378             :                 tids = diff;
    1379             :         }
    1380             :         return tids;
    1381             : }
    1382             : 
    1383             : 
    1384             : static gdk_return
    1385       11517 : logger_switch_bat(BAT *old, BAT *new, const char *fn, const char *name)
    1386             : {
    1387             :         char bak[IDLENGTH];
    1388             : 
    1389       11517 :         if (BATmode(old, true) != GDK_SUCCEED) {
    1390           0 :                 GDKerror("cannot convert old %s to transient", name);
    1391           0 :                 return GDK_FAIL;
    1392             :         }
    1393       11517 :         if (strconcat_len(bak, sizeof(bak), fn, "_", name, NULL) >= sizeof(bak)) {
    1394           0 :                 GDKerror("name %s_%s too long\n", fn, name);
    1395           0 :                 return GDK_FAIL;
    1396             :         }
    1397       23034 :         if (BBPrename(old->batCacheid, NULL) != 0 ||
    1398       11517 :             BBPrename(new->batCacheid, bak) != 0) {
    1399           0 :                 GDKerror("rename (%s) failed\n", bak);
    1400           0 :                 return GDK_FAIL;
    1401             :         }
    1402       11517 :         BBPretain(new->batCacheid);
    1403       11517 :         return GDK_SUCCEED;
    1404             : }
    1405             : 
    1406             : static gdk_return
    1407         266 : bm_get_counts(logger *lg)
    1408             : {
    1409             :         BUN p, q, deleted = 0;
    1410         266 :         const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    1411             : 
    1412       20623 :         BATloop(lg->catalog_bid, p, q) {
    1413       20357 :                 oid pos = p;
    1414       20357 :                 lng cnt = 0;
    1415       20357 :                 lng lid = lng_nil;
    1416             : 
    1417       20357 :                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
    1418       20357 :                         BAT *b = BBPquickdesc(bids[p]);
    1419       20357 :                         assert(b);
    1420       20357 :                         cnt = BATcount(b);
    1421             :                 } else {
    1422           0 :                         deleted++;
    1423           0 :                         lid = 1;
    1424             :                 }
    1425       20357 :                 if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
    1426           0 :                         return GDK_FAIL;
    1427       20357 :                 if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
    1428             :                         return GDK_FAIL;
    1429             :         }
    1430         266 :         lg->deleted = deleted;
    1431         266 :         lg->cnt = BATcount(lg->catalog_bid);
    1432         266 :         return GDK_SUCCEED;
    1433             : }
    1434             : 
    1435             : static int
    1436       11517 : subcommit_list_add(int next, bat *n, BUN *sizes, bat bid, BUN sz)
    1437             : {
    1438     3352318 :         for(int i=0; i<next; i++) {
    1439     3340801 :                 if (n[i] == bid) {
    1440           0 :                         sizes[i] = sz;
    1441           0 :                         return next;
    1442             :                 }
    1443             :         }
    1444       11517 :         n[next] = bid;
    1445       11517 :         sizes[next++] = sz;
    1446       11517 :         return next;
    1447             : }
    1448             : 
    1449             : static int
    1450        2351 : cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts, BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, int cleanup)
    1451             : {
    1452             :         BAT *nbids, *noids, *ncnts, *nlids, *ndels;
    1453             :         BUN p, q;
    1454             :         int err = 0, rcnt = 0;
    1455             : 
    1456        2351 :         oid *poss = Tloc(dcatalog, 0);
    1457       46307 :         BATloop(dcatalog, p, q) {
    1458       43956 :                 oid pos = poss[p];
    1459             : 
    1460       43956 :                 if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
    1461       25418 :                         continue;
    1462             : 
    1463       18538 :                 if (lids[pos] >= 0) {
    1464       18538 :                         lids[pos] = -1; /* mark as transient */
    1465       18538 :                         r[rcnt++] = bids[pos];
    1466             : 
    1467             :                         BAT *lb;
    1468             : 
    1469       37076 :                         if ((lb = BATdescriptor(bids[pos])) == NULL ||
    1470       18538 :                                 BATmode(lb, true/*transient*/) != GDK_SUCCEED) {
    1471           0 :                                 TRC_WARNING(GDK, "Failed to set bat(%d) transient\n", bids[pos]);
    1472             :                         }
    1473             :                         logbat_destroy(lb);
    1474             :                 }
    1475             :         }
    1476        2351 :         BUN ocnt = BATcount(catalog_bid);
    1477        2351 :         nbids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT);
    1478        2351 :         noids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT);
    1479        2351 :         ncnts = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT);
    1480        2351 :         nlids = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT);
    1481        2351 :         ndels = logbat_new(TYPE_oid, BATcount(dcatalog)-cleanup, PERSISTENT);
    1482             : 
    1483        2351 :         if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) {
    1484             :                 logbat_destroy(nbids);
    1485             :                 logbat_destroy(noids);
    1486             :                 logbat_destroy(ncnts);
    1487             :                 logbat_destroy(nlids);
    1488             :                 logbat_destroy(ndels);
    1489           0 :                 return 0;
    1490             :         }
    1491             : 
    1492        2351 :         int *oids = (int*)Tloc(catalog_id, 0);
    1493        2351 :         q = BUNlast(catalog_bid);
    1494      647579 :         for(p = 0; p<q && !err; p++) {
    1495      645228 :                 bat col = bids[p];
    1496      645228 :                 int nid = oids[p];
    1497      645228 :                 lng lid = lids[p];
    1498      645228 :                 lng cnt = cnts[p];
    1499      645228 :                 oid pos = p;
    1500             : 
    1501             :                 /* only project out the deleted with lid == -1
    1502             :                  * update dcatalog */
    1503      645228 :                 if (lid == -1)
    1504       18538 :                         continue; /* remove */
    1505             : 
    1506     1253380 :                 if (BUNappend(nbids, &col, false) != GDK_SUCCEED ||
    1507     1253380 :                     BUNappend(noids, &nid, false) != GDK_SUCCEED ||
    1508     1253380 :                     BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
    1509      626690 :                     BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
    1510             :                         err=1;
    1511      626690 :                 pos = (oid)(BATcount(nbids)-1);
    1512      626690 :                 if (lid != lng_nil && BUNappend(ndels, &pos, false) != GDK_SUCCEED)
    1513             :                         err=1;
    1514             :         }
    1515             : 
    1516        2351 :         if (err) {
    1517             :                 logbat_destroy(nbids);
    1518             :                 logbat_destroy(noids);
    1519             :                 logbat_destroy(ndels);
    1520             :                 logbat_destroy(ncnts);
    1521             :                 logbat_destroy(nlids);
    1522           0 :                 return 0;
    1523             :         }
    1524             :         /* point of no return */
    1525        4702 :         if (logger_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED ||
    1526        4702 :             logger_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED ||
    1527        2351 :             logger_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) {
    1528             :                 logbat_destroy(nbids);
    1529             :                 logbat_destroy(noids);
    1530             :                 logbat_destroy(ndels);
    1531             :                 logbat_destroy(ncnts);
    1532             :                 logbat_destroy(nlids);
    1533           0 :                 return -1;
    1534             :         }
    1535        2351 :         r[rcnt++] = lg->catalog_bid->batCacheid;
    1536        2351 :         r[rcnt++] = lg->catalog_id->batCacheid;
    1537        2351 :         r[rcnt++] = lg->dcatalog->batCacheid;
    1538             : 
    1539             :         logbat_destroy(lg->catalog_bid);
    1540        2351 :         logbat_destroy(lg->catalog_id);
    1541        2351 :         logbat_destroy(lg->dcatalog);
    1542             : 
    1543        2351 :         lg->catalog_bid = nbids;
    1544        2351 :         lg->catalog_id = noids;
    1545        2351 :         lg->dcatalog = ndels;
    1546             : 
    1547        2351 :         BBPunfix(lg->catalog_cnt->batCacheid);
    1548        2351 :         BBPunfix(lg->catalog_lid->batCacheid);
    1549             : 
    1550        2351 :         lg->catalog_cnt = ncnts;
    1551        2351 :         lg->catalog_lid = nlids;
    1552        2351 :         lg->cnt = BATcount(lg->catalog_bid);
    1553        2351 :         lg->deleted -= cleanup;
    1554        2351 :         assert(lg->deleted == BATcount(lg->dcatalog));
    1555             :         return rcnt;
    1556             : }
    1557             : 
    1558             : static gdk_return
    1559       15787 : bm_subcommit(logger *lg)
    1560             : {
    1561             :         BUN p, q;
    1562             :         logger_lock(lg);
    1563       15787 :         BAT *catalog_bid = lg->catalog_bid;
    1564       15787 :         BAT *catalog_id = lg->catalog_id;
    1565       15787 :         BAT *dcatalog = lg->dcatalog;
    1566       15787 :         BUN nn = 13 + BATcount(catalog_bid);
    1567       15787 :         bat *n = GDKmalloc(sizeof(bat) * nn);
    1568       15787 :         bat *r = GDKmalloc(sizeof(bat) * nn);
    1569       15787 :         BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
    1570             :         int i = 0, rcnt = 0;
    1571             :         gdk_return res;
    1572             :         const log_bid *bids;
    1573             :         lng *cnts = NULL, *lids = NULL;
    1574             :         int cleanup = 0;
    1575             :         lng t0 = 0;
    1576             : 
    1577       15787 :         if (n == NULL || r == NULL || sizes == NULL) {
    1578           0 :                 GDKfree(n);
    1579           0 :                 GDKfree(r);
    1580           0 :                 GDKfree(sizes);
    1581             :                 logger_unlock(lg);
    1582           0 :                 return GDK_FAIL;
    1583             :         }
    1584             : 
    1585       15787 :         sizes[i] = 0;
    1586       15787 :         n[i++] = 0;             /* n[0] is not used */
    1587       15787 :         bids = (const log_bid *) Tloc(catalog_bid, 0);
    1588       15787 :         if (lg->catalog_cnt)
    1589       15600 :                 cnts = (lng *) Tloc(lg->catalog_cnt, 0);
    1590       15787 :         if (lg->catalog_lid)
    1591       15600 :                 lids = (lng *) Tloc(lg->catalog_lid, 0);
    1592     4359017 :         BATloop(catalog_bid, p, q) {
    1593     4343230 :                 bat col = bids[p];
    1594             : 
    1595     4343230 :                 if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid)
    1596       18538 :                         cleanup++;
    1597     4343230 :                 if (lg->debug & 1)
    1598           0 :                         fprintf(stderr, "#commit new %s (%d)\n", BBP_logical(col), col);
    1599     4343230 :                 assert(col);
    1600     4343230 :                 sizes[i] = cnts?(BUN)cnts[p]:0;
    1601     4343230 :                 n[i++] = col;
    1602             :         }
    1603             :         /* now commit catalog, so it's also up to date on disk */
    1604       15787 :         sizes[i] = lg->cnt;
    1605       15787 :         n[i++] = catalog_bid->batCacheid;
    1606       15787 :         sizes[i] = lg->cnt;
    1607       15787 :         n[i++] = catalog_id->batCacheid;
    1608       15787 :         sizes[i] = BATcount(dcatalog);
    1609       15787 :         n[i++] = dcatalog->batCacheid;
    1610             : 
    1611       15787 :         if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, catalog_bid, catalog_id, dcatalog, cleanup)) < 0) {
    1612           0 :                 GDKfree(n);
    1613           0 :                 GDKfree(r);
    1614           0 :                 GDKfree(sizes);
    1615             :                 logger_unlock(lg);
    1616           0 :                 return GDK_FAIL;
    1617             :         }
    1618       15787 :         if (dcatalog != lg->dcatalog) {
    1619        2351 :                 i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, BATcount(lg->catalog_bid));
    1620        2351 :                 i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, BATcount(lg->catalog_bid));
    1621        2351 :                 i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog));
    1622             :         }
    1623       15787 :         if (lg->seqs_id) {
    1624       15600 :                 sizes[i] = BATcount(lg->seqs_id);
    1625       15600 :                 n[i++] = lg->seqs_id->batCacheid;
    1626       15600 :                 sizes[i] = BATcount(lg->seqs_id);
    1627       15600 :                 n[i++] = lg->seqs_val->batCacheid;
    1628             :         }
    1629       15787 :         if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id)/2)) {
    1630             :                 BAT *tids, *ids, *vals;
    1631             : 
    1632        2232 :                 tids = bm_tids(lg->seqs_id, lg->dseqs);
    1633        2232 :                 if (tids == NULL) {
    1634           0 :                         GDKfree(n);
    1635           0 :                         GDKfree(r);
    1636           0 :                         GDKfree(sizes);
    1637             :                         logger_unlock(lg);
    1638           0 :                         return GDK_FAIL;
    1639             :                 }
    1640        2232 :                 ids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
    1641        2232 :                 vals = logbat_new(TYPE_lng, BATcount(tids), PERSISTENT);
    1642             : 
    1643        2232 :                 if (ids == NULL || vals == NULL) {
    1644             :                         logbat_destroy(tids);
    1645             :                         logbat_destroy(ids);
    1646             :                         logbat_destroy(vals);
    1647           0 :                         GDKfree(n);
    1648           0 :                         GDKfree(r);
    1649           0 :                         GDKfree(sizes);
    1650             :                         logger_unlock(lg);
    1651           0 :                         return GDK_FAIL;
    1652             :                 }
    1653             : 
    1654        4464 :                 if (BATappend(ids, lg->seqs_id, tids, true) != GDK_SUCCEED ||
    1655        2232 :                     BATappend(vals, lg->seqs_val, tids, true) != GDK_SUCCEED) {
    1656             :                         logbat_destroy(tids);
    1657             :                         logbat_destroy(ids);
    1658             :                         logbat_destroy(vals);
    1659           0 :                         GDKfree(n);
    1660           0 :                         GDKfree(r);
    1661           0 :                         GDKfree(sizes);
    1662             :                         logger_unlock(lg);
    1663           0 :                         return GDK_FAIL;
    1664             :                 }
    1665             :                 logbat_destroy(tids);
    1666        2232 :                 BATclear(lg->dseqs, true);
    1667             : 
    1668        4464 :                 if (logger_switch_bat(lg->seqs_id, ids, lg->fn, "seqs_id") != GDK_SUCCEED ||
    1669        2232 :                     logger_switch_bat(lg->seqs_val, vals, lg->fn, "seqs_val") != GDK_SUCCEED) {
    1670             :                         logbat_destroy(ids);
    1671             :                         logbat_destroy(vals);
    1672           0 :                         GDKfree(n);
    1673           0 :                         GDKfree(r);
    1674           0 :                         GDKfree(sizes);
    1675             :                         logger_unlock(lg);
    1676           0 :                         return GDK_FAIL;
    1677             :                 }
    1678        2232 :                 i = subcommit_list_add(i, n, sizes, ids->batCacheid, BATcount(ids));
    1679        2232 :                 i = subcommit_list_add(i, n, sizes, vals->batCacheid, BATcount(ids));
    1680             : 
    1681        2232 :                 if (BBP_lrefs(lg->seqs_id->batCacheid) > 0)
    1682        2029 :                         r[rcnt++] = lg->seqs_id->batCacheid;
    1683        2232 :                 if (BBP_lrefs(lg->seqs_val->batCacheid) > 0)
    1684        2029 :                         r[rcnt++] = lg->seqs_val->batCacheid;
    1685             : 
    1686             :                 logbat_destroy(lg->seqs_id);
    1687        2232 :                 logbat_destroy(lg->seqs_val);
    1688             : 
    1689        2232 :                 lg->seqs_id = ids;
    1690        2232 :                 lg->seqs_val = vals;
    1691             :         }
    1692       15787 :         if (lg->seqs_id) {
    1693       15600 :                 sizes[i] = BATcount(lg->dseqs);
    1694       15600 :                 n[i++] = lg->dseqs->batCacheid;
    1695             :         }
    1696             : 
    1697       15787 :         assert((BUN) i <= nn);
    1698             :         logger_unlock(lg);
    1699       15787 :         if (lg->debug & 1)
    1700           0 :                 t0 = GDKusec();
    1701       15974 :         res = TMsubcommit_list(n, cnts?sizes:NULL, i, lg->saved_id, lg->saved_tid);
    1702       15787 :         if (lg->debug & 1)
    1703           0 :                 fprintf(stderr, "#subcommit " LLFMT "usec\n", GDKusec() - t0);
    1704       15787 :         if (res == GDK_SUCCEED) { /* now cleanup */
    1705       45436 :                 for(i=0;i<rcnt; i++) {
    1706       29649 :                         if (lg->debug & 1) {
    1707           0 :                                 fprintf(stderr, "release %d\n", r[i]);
    1708           0 :                                 if (BBP_lrefs(r[i]) != 2)
    1709           0 :                                         fprintf(stderr, "release %d %d\n", r[i], BBP_lrefs(r[i]));
    1710             :                         }
    1711       29649 :                         BBPrelease(r[i]);
    1712             :                 }
    1713             :         }
    1714       15787 :         GDKfree(n);
    1715       15787 :         GDKfree(r);
    1716       15787 :         GDKfree(sizes);
    1717       15787 :         if (res != GDK_SUCCEED)
    1718           0 :                 TRC_CRITICAL(GDK, "commit failed\n");
    1719             :         return res;
    1720             : }
    1721             : 
    1722             : static gdk_return
    1723         265 : logger_filename(logger *lg, char bak[FILENAME_MAX], char filename[FILENAME_MAX])
    1724             : {
    1725             :         str filenamestr = NULL;
    1726             : 
    1727         265 :         if ((filenamestr = GDKfilepath(0, lg->dir, LOGFILE, NULL)) == NULL)
    1728             :                 return GDK_FAIL;
    1729         265 :         size_t len = strcpy_len(filename, filenamestr, FILENAME_MAX);
    1730         265 :         GDKfree(filenamestr);
    1731         265 :         if (len >= FILENAME_MAX) {
    1732           0 :                 GDKerror("Logger filename path is too large\n");
    1733           0 :                 return GDK_FAIL;
    1734             :         }
    1735         265 :         if (bak) {
    1736         265 :                 len = strconcat_len(bak, FILENAME_MAX, filename, ".bak", NULL);
    1737         265 :                 if (len >= FILENAME_MAX) {
    1738           0 :                         GDKerror("Logger filename path is too large\n");
    1739           0 :                         return GDK_FAIL;
    1740             :                 }
    1741             :         }
    1742             :         return GDK_SUCCEED;
    1743             : }
    1744             : 
    1745             : static gdk_return
    1746       11299 : logger_cleanup(logger *lg, lng id)
    1747             : {
    1748             :         char log_id[FILENAME_MAX];
    1749             : 
    1750       11299 :         if (snprintf(log_id, sizeof(log_id), LLFMT, id) >= FILENAME_MAX) {
    1751           0 :                 GDKerror("log_id filename is too large\n");
    1752           0 :                 return GDK_FAIL;
    1753             :         }
    1754       11299 :         if (GDKunlink(0, lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
    1755           0 :                 TRC_WARNING(GDK, "#logger_cleanup: failed to remove old WAL %s.%s\n", LOGFILE, log_id);
    1756           0 :                 GDKclrerr();
    1757             :         }
    1758             :         return GDK_SUCCEED;
    1759             : }
    1760             : 
    1761             : /* Load data from the logger logdir
    1762             :  * Initialize new directories and catalog files if none are present,
    1763             :  * unless running in read-only mode
    1764             :  * Load data and persist it in the BATs */
    1765             : static gdk_return
    1766         266 : logger_load(int debug, const char *fn, const char *logdir, logger *lg, char filename[FILENAME_MAX])
    1767             : {
    1768             :         FILE *fp = NULL;
    1769             :         char bak[FILENAME_MAX];
    1770             :         bat catalog_bid, catalog_id, dcatalog;
    1771             :         bool needcommit = false;
    1772         266 :         int dbg = GDKdebug;
    1773             :         bool readlogs = false;
    1774             : 
    1775             :         /* refactor */
    1776         266 :         if (!LOG_DISABLED(lg)) {
    1777         265 :                 if (logger_filename(lg, bak, filename) != GDK_SUCCEED)
    1778           0 :                         goto error;
    1779             :         }
    1780             : 
    1781         266 :         lg->catalog_bid = NULL;
    1782         266 :         lg->catalog_id = NULL;
    1783         266 :         lg->catalog_cnt = NULL;
    1784         266 :         lg->catalog_lid = NULL;
    1785         266 :         lg->dcatalog = NULL;
    1786             : 
    1787         266 :         lg->seqs_id = NULL;
    1788         266 :         lg->seqs_val = NULL;
    1789         266 :         lg->dseqs = NULL;
    1790             : 
    1791         266 :         lg->type_id = NULL;
    1792         266 :         lg->type_nme = NULL;
    1793         266 :         lg->type_nr = NULL;
    1794             : 
    1795         266 :         if (!LOG_DISABLED(lg)) {
    1796             :                 /* try to open logfile backup, or failing that, the file
    1797             :                  * itself. we need to know whether this file exists when
    1798             :                  * checking the database consistency later on */
    1799         265 :                 if ((fp = MT_fopen(bak, "r")) != NULL) {
    1800           0 :                         fclose(fp);
    1801             :                         fp = NULL;
    1802           0 :                         if (GDKunlink(0, lg->dir, LOGFILE, NULL) != GDK_SUCCEED ||
    1803           0 :                             GDKmove(0, lg->dir, LOGFILE, "bak", lg->dir, LOGFILE, NULL, true) != GDK_SUCCEED)
    1804           0 :                                 goto error;
    1805         265 :                 } else if (errno != ENOENT) {
    1806           0 :                         GDKsyserror("open %s failed", bak);
    1807           0 :                         goto error;
    1808             :                 }
    1809             :                 fp = MT_fopen(filename, "r");
    1810         265 :                 if (fp == NULL && errno != ENOENT) {
    1811           0 :                         GDKsyserror("open %s failed", filename);
    1812           0 :                         goto error;
    1813             :                 }
    1814             :         }
    1815             : 
    1816         266 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    1817         266 :         catalog_bid = BBPindex(bak);
    1818             : 
    1819             :         /* create transient bats for type mapping, to be read from disk */
    1820         266 :         lg->type_id = logbat_new(TYPE_bte, BATSIZE, TRANSIENT);
    1821         266 :         lg->type_nme = logbat_new(TYPE_str, BATSIZE, TRANSIENT);
    1822         266 :         lg->type_nr = logbat_new(TYPE_int, BATSIZE, TRANSIENT);
    1823             : 
    1824         266 :         if (lg->type_id == NULL || lg->type_nme == NULL || lg->type_nr == NULL) {
    1825           0 :                 if (fp)
    1826           0 :                         fclose(fp);
    1827             :                 fp = NULL;
    1828           0 :                 GDKerror("cannot create type bats");
    1829           0 :                 goto error;
    1830             :         }
    1831             : 
    1832             :         /* this is intentional - if catalog_bid is 0, force it to find
    1833             :          * the persistent catalog */
    1834         266 :         if (catalog_bid == 0) {
    1835             :                 /* catalog does not exist, so the log file also
    1836             :                  * shouldn't exist */
    1837         187 :                 if (fp != NULL) {
    1838           0 :                         GDKerror("there is no logger catalog, "
    1839             :                                  "but there is a log file.\n");
    1840           0 :                         goto error;
    1841             :                 }
    1842             : 
    1843         187 :                 lg->catalog_bid = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1844         187 :                 lg->catalog_id = logbat_new(TYPE_int, BATSIZE, PERSISTENT);
    1845         187 :                 lg->dcatalog = logbat_new(TYPE_oid, BATSIZE, PERSISTENT);
    1846             : 
    1847         187 :                 if (lg->catalog_bid == NULL || lg->catalog_id == NULL || lg->dcatalog == NULL) {
    1848           0 :                         GDKerror("cannot create catalog bats");
    1849           0 :                         goto error;
    1850             :                 }
    1851         187 :                 if (debug & 1)
    1852           0 :                         fprintf(stderr, "#create %s catalog\n", fn);
    1853             : 
    1854             :                 /* give the catalog bats names so we can find them
    1855             :                  * next time */
    1856         187 :                 strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    1857         187 :                 if (BBPrename(lg->catalog_bid->batCacheid, bak) < 0) {
    1858           0 :                         goto error;
    1859             :                 }
    1860             : 
    1861         187 :                 strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
    1862         187 :                 if (BBPrename(lg->catalog_id->batCacheid, bak) < 0) {
    1863           0 :                         goto error;
    1864             :                 }
    1865             : 
    1866         187 :                 strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    1867         187 :                 if (BBPrename(lg->dcatalog->batCacheid, bak) < 0) {
    1868           0 :                         goto error;
    1869             :                 }
    1870             : 
    1871         187 :                 if (!LOG_DISABLED(lg)) {
    1872         186 :                         if (GDKcreatedir(filename) != GDK_SUCCEED) {
    1873           0 :                                 GDKerror("cannot create directory for log file %s\n", filename);
    1874           0 :                                 goto error;
    1875             :                         }
    1876         186 :                         if (logger_create_types_file(lg, filename) != GDK_SUCCEED)
    1877           0 :                                 goto error;
    1878             :                 }
    1879             : 
    1880         187 :                 BBPretain(lg->catalog_bid->batCacheid);
    1881         187 :                 BBPretain(lg->catalog_id->batCacheid);
    1882         187 :                 BBPretain(lg->dcatalog->batCacheid);
    1883             : 
    1884         187 :                 if (bm_subcommit(lg) != GDK_SUCCEED) {
    1885             :                         /* cannot commit catalog, so remove log */
    1886             :                         MT_remove(filename);
    1887           0 :                         BBPrelease(lg->catalog_bid->batCacheid);
    1888           0 :                         BBPrelease(lg->catalog_id->batCacheid);
    1889           0 :                         BBPrelease(lg->dcatalog->batCacheid);
    1890           0 :                         goto error;
    1891             :                 }
    1892             :         } else {
    1893             :                 /* find the persistent catalog. As non persistent bats
    1894             :                  * require a logical reference we also add a logical
    1895             :                  * reference for the persistent bats */
    1896             :                 BUN p, q;
    1897             :                 BAT *b, *o, *d;
    1898             : 
    1899          79 :                 assert(!lg->inmemory);
    1900             : 
    1901             :                 /* the catalog exists, and so should the log file */
    1902          79 :                 if (fp == NULL && !LOG_DISABLED(lg)) {
    1903           0 :                         GDKerror("There is a logger catalog, but no log file.\n");
    1904           0 :                         goto error;
    1905             :                 }
    1906          79 :                 if (fp != NULL) {
    1907             :                         /* check_version always closes fp */
    1908          79 :                         if (check_version(lg, fp, fn, logdir, filename) != GDK_SUCCEED) {
    1909             :                                 fp = NULL;
    1910           0 :                                 goto error;
    1911             :                         }
    1912             :                         readlogs = true;
    1913             :                         fp = NULL;
    1914             :                 }
    1915             : 
    1916          79 :                 if (lg->catalog_bid == NULL && lg->catalog_id == NULL && lg->dcatalog == NULL) {
    1917          71 :                         b = BATdescriptor(catalog_bid);
    1918          71 :                         if (b == NULL) {
    1919           0 :                                 GDKerror("inconsistent database, catalog does not exist");
    1920           0 :                                 goto error;
    1921             :                         }
    1922             : 
    1923          71 :                         strconcat_len(bak, sizeof(bak), fn, "_catalog_id", NULL);
    1924          71 :                         catalog_id = BBPindex(bak);
    1925          71 :                         o = BATdescriptor(catalog_id);
    1926          71 :                         if (o == NULL) {
    1927           0 :                                 BBPunfix(b->batCacheid);
    1928           0 :                                 GDKerror("inconsistent database, catalog_id does not exist");
    1929           0 :                                 goto error;
    1930             :                         }
    1931             : 
    1932          71 :                         strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    1933          71 :                         dcatalog = BBPindex(bak);
    1934          71 :                         d = BATdescriptor(dcatalog);
    1935          71 :                         if (d == NULL) {
    1936           0 :                                 GDKerror("cannot create dcatalog bat");
    1937           0 :                                 BBPunfix(b->batCacheid);
    1938           0 :                                 BBPunfix(o->batCacheid);
    1939           0 :                                 goto error;
    1940             :                         }
    1941             : 
    1942          71 :                         lg->catalog_bid = b;
    1943          71 :                         lg->catalog_id = o;
    1944          71 :                         lg->dcatalog = d;
    1945          71 :                         const log_bid *bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    1946       18138 :                         BATloop(lg->catalog_bid, p, q) {
    1947       18067 :                                 bat bid = bids[p];
    1948       18067 :                                 oid pos = p;
    1949             : 
    1950       18067 :                                 if (BBPretain(bid) == 0 && /* any bid in the catalog_bid, needs one logical ref */
    1951           0 :                                     BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
    1952           0 :                                     BUNappend(lg->dcatalog, &pos, false) != GDK_SUCCEED)
    1953           0 :                                         goto error;
    1954             :                         }
    1955             :                 }
    1956          79 :                 BBPretain(lg->catalog_bid->batCacheid);
    1957          79 :                 BBPretain(lg->catalog_id->batCacheid);
    1958          79 :                 BBPretain(lg->dcatalog->batCacheid);
    1959             :         }
    1960         266 :         lg->catalog_cnt = logbat_new(TYPE_lng, 1, TRANSIENT);
    1961         266 :         if (lg->catalog_cnt == NULL) {
    1962           0 :                 GDKerror("failed to create catalog_cnt bat");
    1963           0 :                 goto error;
    1964             :         }
    1965         266 :         lg->catalog_lid = logbat_new(TYPE_lng, 1, TRANSIENT);
    1966         266 :         if (lg->catalog_lid == NULL) {
    1967           0 :                 GDKerror("failed to create catalog_lid bat");
    1968           0 :                 goto error;
    1969             :         }
    1970         266 :         if (bm_get_counts(lg) != GDK_SUCCEED)
    1971           0 :                 goto error;
    1972             : 
    1973         266 :         strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    1974         266 :         if (BBPindex(bak)) {
    1975          79 :                 lg->seqs_id = BATdescriptor(BBPindex(bak));
    1976          79 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    1977          79 :                 lg->seqs_val = BATdescriptor(BBPindex(bak));
    1978          79 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    1979          79 :                 lg->dseqs = BATdescriptor(BBPindex(bak));
    1980             :         } else {
    1981         187 :                 lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
    1982         187 :                 lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
    1983         187 :                 lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
    1984         187 :                 if (lg->seqs_id == NULL ||
    1985         187 :                     lg->seqs_val == NULL ||
    1986             :                     lg->dseqs == NULL) {
    1987           0 :                         GDKerror("Logger_new: cannot create seqs bats");
    1988           0 :                         goto error;
    1989             :                 }
    1990             : 
    1991         187 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    1992         187 :                 if (BBPrename(lg->seqs_id->batCacheid, bak) < 0) {
    1993           0 :                         goto error;
    1994             :                 }
    1995             : 
    1996         187 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    1997         187 :                 if (BBPrename(lg->seqs_val->batCacheid, bak) < 0) {
    1998           0 :                         goto error;
    1999             :                 }
    2000             : 
    2001         187 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    2002         187 :                 if (BBPrename(lg->dseqs->batCacheid, bak) < 0) {
    2003           0 :                         goto error;
    2004             :                 }
    2005             :                 needcommit = true;
    2006             :         }
    2007         266 :         dbg = GDKdebug;
    2008         266 :         GDKdebug &= ~(CHECKMASK|PROPMASK);
    2009         266 :         if (needcommit && bm_commit(lg) != GDK_SUCCEED) {
    2010           0 :                 GDKerror("Logger_new: commit failed");
    2011           0 :                 goto error;
    2012             :         }
    2013         266 :         GDKdebug = dbg;
    2014             : 
    2015         266 :         if (readlogs) {
    2016          79 :                 ulng log_id = lg->saved_id+1;
    2017          79 :                 if (logger_readlogs(lg, filename) != GDK_SUCCEED) {
    2018           0 :                         goto error;
    2019             :                 }
    2020          79 :                 if (lg->postfuncp && (*lg->postfuncp)(lg->funcdata, lg) != GDK_SUCCEED)
    2021           0 :                         goto error;
    2022          79 :                 dbg = GDKdebug;
    2023          79 :                 GDKdebug &= ~(CHECKMASK|PROPMASK);
    2024          79 :                 if (logger_commit(lg) != GDK_SUCCEED) {
    2025           0 :                         goto error;
    2026             :                 }
    2027          79 :                 GDKdebug = dbg;
    2028         171 :                 for( ; log_id <= lg->saved_id; log_id++)
    2029          92 :                         (void)logger_cleanup(lg, log_id);  /* ignore error of removing file */
    2030             :         } else {
    2031         187 :                 lg->id = lg->saved_id+1;
    2032             :         }
    2033             :         return GDK_SUCCEED;
    2034           0 :   error:
    2035           0 :         if (fp)
    2036           0 :                 fclose(fp);
    2037           0 :         logbat_destroy(lg->catalog_bid);
    2038           0 :         logbat_destroy(lg->catalog_id);
    2039           0 :         logbat_destroy(lg->dcatalog);
    2040           0 :         logbat_destroy(lg->seqs_id);
    2041           0 :         logbat_destroy(lg->seqs_val);
    2042           0 :         logbat_destroy(lg->dseqs);
    2043           0 :         logbat_destroy(lg->type_id);
    2044           0 :         logbat_destroy(lg->type_nme);
    2045           0 :         logbat_destroy(lg->type_nr);
    2046           0 :         GDKfree(lg->fn);
    2047           0 :         GDKfree(lg->dir);
    2048           0 :         GDKfree(lg->local_dir);
    2049           0 :         GDKfree(lg->buf);
    2050           0 :         GDKfree(lg);
    2051           0 :         GDKdebug = dbg;
    2052           0 :         return GDK_FAIL;
    2053             : }
    2054             : 
    2055             : /* Initialize a new logger
    2056             :  * It will load any data in the logdir and persist it in the BATs*/
    2057             : static logger *
    2058         266 : logger_new(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void *funcdata)
    2059             : {
    2060             :         logger *lg;
    2061             :         char filename[FILENAME_MAX];
    2062             : 
    2063         266 :         if (!GDKinmemory(0) && MT_path_absolute(logdir)) {
    2064           0 :                 TRC_CRITICAL(GDK, "logdir must be relative path\n");
    2065           0 :                 return NULL;
    2066             :         }
    2067             : 
    2068         266 :         lg = GDKmalloc(sizeof(struct logger));
    2069         266 :         if (lg == NULL) {
    2070           0 :                 TRC_CRITICAL(GDK, "allocating logger structure failed\n");
    2071           0 :                 return NULL;
    2072             :         }
    2073             : 
    2074         266 :         *lg = (logger) {
    2075         266 :                 .inmemory = GDKinmemory(0),
    2076             :                 .debug = debug,
    2077             :                 .version = version,
    2078             :                 .prefuncp = prefuncp,
    2079             :                 .postfuncp = postfuncp,
    2080             :                 .funcdata = funcdata,
    2081             : 
    2082             :                 .id = 0,
    2083         266 :                 .saved_id = getBBPlogno(),              /* get saved log numer from bbp */
    2084         266 :                 .saved_tid = (int)getBBPtransid(),      /* get saved transaction id from bbp */
    2085             :         };
    2086         266 :         MT_lock_init(&lg->lock, fn);
    2087             : 
    2088             :         /* probably open file and check version first, then call call old logger code */
    2089         266 :         if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
    2090           0 :                 TRC_CRITICAL(GDK, "filename is too large\n");
    2091           0 :                 GDKfree(lg);
    2092           0 :                 return NULL;
    2093             :         }
    2094         266 :         lg->fn = GDKstrdup(fn);
    2095         266 :         lg->dir = GDKstrdup(filename);
    2096         266 :         lg->bufsize = 64*1024;
    2097         266 :         lg->buf = GDKmalloc(lg->bufsize);
    2098         266 :         if (lg->fn == NULL || lg->dir == NULL || lg->buf == NULL) {
    2099           0 :                 TRC_CRITICAL(GDK, "strdup failed\n");
    2100           0 :                 GDKfree(lg->fn);
    2101           0 :                 GDKfree(lg->dir);
    2102           0 :                 GDKfree(lg->buf);
    2103           0 :                 GDKfree(lg);
    2104           0 :                 return NULL;
    2105             :         }
    2106         266 :         if (lg->debug & 1) {
    2107           0 :                 fprintf(stderr, "#logger_new dir set to %s\n", lg->dir);
    2108             :         }
    2109             : 
    2110         266 :         if (logger_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
    2111         266 :                 return lg;
    2112             :         }
    2113             :         return NULL;
    2114             : }
    2115             : 
    2116             : void
    2117         265 : logger_destroy(logger *lg)
    2118             : {
    2119        1628 :         for (logged_range *p = lg->pending; p; ){
    2120        1363 :                 logged_range *n = p->next;
    2121        1363 :                 GDKfree(p);
    2122             :                 p = n;
    2123             :         }
    2124         265 :         if (LOG_DISABLED(lg)) {
    2125           1 :                 lg->saved_id = lg->id;
    2126           1 :                 lg->saved_tid = lg->tid;
    2127           1 :                 logger_commit(lg);
    2128             :         }
    2129         265 :         if (lg->catalog_bid) {
    2130             :                 logger_lock(lg);
    2131             :                 BUN p, q;
    2132         265 :                 BAT *b = lg->catalog_bid;
    2133             : 
    2134             :                 /* free resources */
    2135         265 :                 const log_bid *bids = (const log_bid *) Tloc(b, 0);
    2136       70355 :                 BATloop(b, p, q) {
    2137       70090 :                         bat bid = bids[p];
    2138             : 
    2139       70090 :                         BBPrelease(bid);
    2140             :                 }
    2141             : 
    2142         265 :                 BBPrelease(lg->catalog_bid->batCacheid);
    2143         265 :                 BBPrelease(lg->catalog_id->batCacheid);
    2144         265 :                 BBPrelease(lg->dcatalog->batCacheid);
    2145         265 :                 logbat_destroy(lg->catalog_bid);
    2146         265 :                 logbat_destroy(lg->catalog_id);
    2147         265 :                 logbat_destroy(lg->dcatalog);
    2148             : 
    2149         265 :                 logbat_destroy(lg->catalog_cnt);
    2150         265 :                 logbat_destroy(lg->catalog_lid);
    2151             :                 logger_unlock(lg);
    2152             :         }
    2153         265 :         GDKfree(lg->fn);
    2154         265 :         GDKfree(lg->dir);
    2155         265 :         GDKfree(lg->buf);
    2156         265 :         logger_close_input(lg);
    2157         265 :         logger_close_output(lg);
    2158         265 :         GDKfree(lg);
    2159         265 : }
    2160             : 
    2161             : /* Create a new logger */
    2162             : logger *
    2163         266 : logger_create(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void *funcdata)
    2164             : {
    2165             :         logger *lg;
    2166         266 :         lg = logger_new(debug, fn, logdir, version, prefuncp, postfuncp, funcdata);
    2167         266 :         if (lg == NULL)
    2168             :                 return NULL;
    2169         266 :         if (lg->debug & 1) {
    2170           0 :                 printf("# Started processing logs %s/%s version %d\n",fn,logdir,version);
    2171           0 :                 fflush(stdout);
    2172             :         }
    2173         266 :         if (lg->debug & 1) {
    2174           0 :                 printf("# Finished processing logs %s/%s\n",fn,logdir);
    2175           0 :                 fflush(stdout);
    2176             :         }
    2177         266 :         if (GDKsetenv("recovery", "finished") != GDK_SUCCEED) {
    2178           0 :                 logger_destroy(lg);
    2179           0 :                 return NULL;
    2180             :         }
    2181         266 :         if (logger_open_output(lg) != GDK_SUCCEED) {
    2182           0 :                 logger_destroy(lg);
    2183           0 :                 return NULL;
    2184             :         }
    2185             :         return lg;
    2186             : }
    2187             : 
    2188             : static ulng
    2189             : logger_next_logfile(logger *lg, ulng ts)
    2190             : {
    2191       11285 :         if (!lg->pending || !lg->pending->next)
    2192             :                 return 0;
    2193       11281 :         if (lg->pending->last_ts < ts)
    2194       11207 :                 return lg->pending->id;
    2195             :         return 0;
    2196             : }
    2197             : 
    2198             : static void
    2199       11207 : logger_cleanup_range(logger *lg)
    2200             : {
    2201       11207 :         logged_range *p = lg->pending;
    2202       11207 :         if (p) {
    2203       11207 :                 lg->pending = p->next;
    2204       11207 :                 GDKfree(p);
    2205             :         }
    2206       11207 : }
    2207             : 
    2208             : gdk_return
    2209           1 : logger_activate(logger *lg)
    2210             : {
    2211           1 :         if (lg->end > 0 && lg->saved_id+1 == lg->id) {
    2212           1 :                 lg->id++;
    2213           1 :                 logger_close_output(lg);
    2214             :                 /* start new file */
    2215           1 :                 if (logger_open_output(lg) != GDK_SUCCEED)
    2216           0 :                         return GDK_FAIL;
    2217             :         }
    2218             :         return GDK_SUCCEED;
    2219             : }
    2220             : 
    2221             : gdk_return
    2222       11285 : logger_flush(logger *lg, ulng ts)
    2223             : {
    2224             :         ulng lid = logger_next_logfile(lg, ts);
    2225       11285 :         if (LOG_DISABLED(lg)) {
    2226           0 :                 lg->saved_id = lid;
    2227           0 :                 lg->saved_tid = lg->tid;
    2228           0 :                 if (lid)
    2229           0 :                         logger_cleanup_range(lg);
    2230           0 :                 if (logger_commit(lg) != GDK_SUCCEED)
    2231           0 :                         TRC_ERROR(GDK, "failed to commit");
    2232           0 :                 return GDK_SUCCEED;
    2233             :         }
    2234       11285 :         if (lg->saved_id >= lid)
    2235             :                 return GDK_SUCCEED;
    2236       11207 :         if (lg->saved_id+1 >= lg->id) /* logger should first release the file */
    2237             :                 return GDK_SUCCEED;
    2238             :         log_return res = LOG_OK;
    2239       22414 :         while(lg->saved_id < lid && res == LOG_OK) {
    2240       11207 :                 if (lg->saved_id >= lg->id)
    2241             :                         break;
    2242       11207 :                 if (!lg->input_log) {
    2243             :                         char *filename;
    2244             :                         char id[32];
    2245       11207 :                         if (snprintf(id, sizeof(id), LLFMT, lg->saved_id+1) >= (int) sizeof(id)) {
    2246           0 :                                 TRC_CRITICAL(GDK, "log_id filename is too large\n");
    2247           0 :                                 return GDK_FAIL;
    2248             :                         }
    2249       11207 :                         if (!(filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)))
    2250             :                                 return GDK_FAIL;
    2251       11207 :                         if (strlen(filename) >= FILENAME_MAX) {
    2252           0 :                                 GDKerror("Logger filename path is too large\n");
    2253           0 :                                 GDKfree(filename);
    2254           0 :                                 return GDK_FAIL;
    2255             :                         }
    2256             : 
    2257       11207 :                         bool filemissing = false;
    2258       11207 :                         if (logger_open_input(lg, filename, &filemissing) != GDK_SUCCEED) {
    2259           0 :                                 GDKfree(filename);
    2260           0 :                                 return GDK_FAIL;
    2261             :                         }
    2262       11207 :                         GDKfree(filename);
    2263             :                 }
    2264             :                 /* we read the full file because skipping is impossible with current log format */
    2265             :                 logger_lock(lg);
    2266       11207 :                 lg->flushing = 1;
    2267       11207 :                 res = logger_read_transaction(lg);
    2268       11207 :                 lg->flushing = 0;
    2269             :                 logger_unlock(lg);
    2270       11207 :                 if (res == LOG_EOF) {
    2271       11207 :                         logger_close_input(lg);
    2272             :                         res = LOG_OK;
    2273             :                 }
    2274           0 :                 if (res != LOG_ERR) {
    2275       11207 :                         lg->saved_id++;
    2276       11207 :                         if (logger_commit(lg) != GDK_SUCCEED) {
    2277           0 :                                 TRC_ERROR(GDK, "failed to commit");
    2278             :                                 res = LOG_ERR;
    2279             :                         }
    2280             : 
    2281             :                         /* remove old log file */
    2282       11207 :                         if (res != LOG_ERR) {
    2283       11207 :                                 if (logger_cleanup(lg, lg->saved_id) != GDK_SUCCEED)
    2284             :                                         res = LOG_ERR;
    2285             :                         }
    2286             :                 }
    2287             :         }
    2288       11207 :         if (lid && res == LOG_OK)
    2289       11207 :                 logger_cleanup_range(lg);
    2290       11207 :         return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    2291             : }
    2292             : 
    2293             : /* Clean-up write-ahead log files already persisted in the BATs, leaving only the most recent one.
    2294             :  * Only the bak- files are deleted for the preserved WAL files.
    2295             :  */
    2296             : lng
    2297       28650 : logger_changes(logger *lg)
    2298             : {
    2299       28650 :         return (lg->id - lg->saved_id - 1);
    2300             : }
    2301             : 
    2302             : int
    2303         377 : logger_sequence(logger *lg, int seq, lng *id)
    2304             : {
    2305             :         logger_lock(lg);
    2306         377 :         BUN p = log_find(lg->seqs_id, lg->dseqs, seq);
    2307             : 
    2308         377 :         if (p != BUN_NONE) {
    2309         283 :                 *id = *(lng *) Tloc(lg->seqs_val, p);
    2310             : 
    2311             :                 logger_unlock(lg);
    2312         283 :                 return 1;
    2313             :         }
    2314             :         logger_unlock(lg);
    2315          94 :         return 0;
    2316             : }
    2317             : 
    2318             : gdk_return
    2319      105263 : log_constant(logger *lg, int type, ptr val, log_id id, lng offset, lng cnt)
    2320             : {
    2321      105263 :         bte tpe = find_type(lg, type);
    2322             :         gdk_return ok = GDK_SUCCEED;
    2323             :         logformat l;
    2324             :         lng nr;
    2325             :         int is_row = 0;
    2326             : 
    2327      105263 :         if (lg->row_insert_nrcols != 0) {
    2328           0 :                 lg->row_insert_nrcols--;
    2329             :                 is_row = 1;
    2330             :         }
    2331      105263 :         l.flag = LOG_UPDATE_CONST;
    2332      105263 :         l.id = id;
    2333             :         nr = cnt;
    2334             : 
    2335      105263 :         if (LOG_DISABLED(lg) || !nr) {
    2336             :                 /* logging is switched off */
    2337       46863 :                 if (nr) {
    2338             :                         logger_lock(lg);
    2339       40148 :                         ok = la_bat_update_count(lg, id, offset+cnt);
    2340             :                         logger_unlock(lg);
    2341             :                 }
    2342       46863 :                 return ok;
    2343             :         }
    2344             : 
    2345       58400 :         gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite;
    2346             : 
    2347       58400 :         if (is_row)
    2348           0 :                 l.flag = tpe;
    2349       58400 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2350       58400 :             (!is_row && !mnstr_writeLng(lg->output_log, nr)) ||
    2351       58400 :             (!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 1) ||
    2352       58400 :             (!is_row && !mnstr_writeLng(lg->output_log, offset))) {
    2353             :                 ok = GDK_FAIL;
    2354           0 :                 goto bailout;
    2355             :         }
    2356             : 
    2357       58400 :         ok = wt(val, lg->output_log, 1);
    2358             : 
    2359       58400 :         if (lg->debug & 1)
    2360           0 :                 fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr);
    2361             : 
    2362       58400 :   bailout:
    2363       58400 :         if (ok != GDK_SUCCEED) {
    2364           0 :                 const char *err = mnstr_peek_error(lg->output_log);
    2365           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2366             :         }
    2367             :         return ok;
    2368             : }
    2369             : 
    2370             : static gdk_return
    2371       14852 : string_writer(logger *lg, BAT *b, lng offset, lng nr)
    2372             : {
    2373       14852 :         size_t bufsz = lg->bufsize, resize = 0;
    2374       14852 :         BUN end = (BUN)(offset + nr);
    2375       14852 :         char *buf = lg->buf;
    2376             :         gdk_return res = GDK_SUCCEED;
    2377             : 
    2378       14852 :         if (!buf)
    2379             :                 return GDK_FAIL;
    2380       14852 :         BATiter bi = bat_iterator(b);
    2381       14852 :         BUN p = (BUN)offset;
    2382       29755 :         for ( ; p < end; ) {
    2383             :                 size_t sz = 0;
    2384       14903 :                 if (resize) {
    2385           1 :                         if (!(buf = GDKrealloc(lg->buf, resize))) {
    2386             :                                 res = GDK_FAIL;
    2387             :                                 break;
    2388             :                         }
    2389           1 :                         lg->buf = buf;
    2390           1 :                         lg->bufsize = bufsz = resize;
    2391             :                         resize = 0;
    2392             :                 }
    2393             :                 char *dst = buf;
    2394       65061 :                 for(; p < end && sz < bufsz; p++) {
    2395       50209 :                         char *s = BUNtail(bi, p);
    2396       50209 :                         size_t len = strlen(s)+1;
    2397       50209 :                         if ((sz+len) > bufsz) {
    2398          51 :                                 if (len > bufsz)
    2399           1 :                                         resize = len+bufsz;
    2400             :                                 break;
    2401             :                         } else {
    2402       50158 :                                 memcpy(dst, s, len);
    2403       50158 :                                 dst += len;
    2404             :                                 sz += len;
    2405             :                         }
    2406             :                 }
    2407       14903 :                 if (sz && (!mnstr_writeLng(lg->output_log, (lng) sz) || mnstr_write(lg->output_log, buf, sz, 1) != 1)) {
    2408             :                         res = GDK_FAIL;
    2409             :                         break;
    2410             :                 }
    2411             :         }
    2412       14852 :         bat_iterator_end(&bi);
    2413       14852 :         return res;
    2414             : }
    2415             : 
    2416             : static gdk_return
    2417      222787 : internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced)
    2418             : {
    2419      222787 :         bte tpe = find_type(lg, b->ttype);
    2420             :         gdk_return ok = GDK_SUCCEED;
    2421             :         logformat l;
    2422             :         BUN p;
    2423             :         lng nr;
    2424             :         int is_row = 0;
    2425             : 
    2426      222787 :         if (lg->row_insert_nrcols != 0) {
    2427           0 :                 lg->row_insert_nrcols--;
    2428             :                 is_row = 1;
    2429             :         }
    2430      222787 :         l.flag = LOG_UPDATE_BULK;
    2431      222787 :         l.id = id;
    2432             :         nr = cnt;
    2433             : 
    2434      222787 :         if (LOG_DISABLED(lg) || !nr) {
    2435             :                 /* logging is switched off */
    2436      172003 :                 lg->end += nr;
    2437      172003 :                 if (nr)
    2438      121300 :                         return la_bat_update_count(lg, id, offset+cnt);
    2439             :                 return GDK_SUCCEED;
    2440             :         }
    2441             : 
    2442       50784 :         gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite;
    2443             : 
    2444       50784 :         if (is_row)
    2445           0 :                 l.flag = tpe;
    2446       50784 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2447       50784 :             (!is_row && !mnstr_writeLng(lg->output_log, nr)) ||
    2448       50784 :             (!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 1) ||
    2449       50784 :             (!is_row && !mnstr_writeLng(lg->output_log, offset))) {
    2450             :                 ok = GDK_FAIL;
    2451           0 :                 goto bailout;
    2452             :         }
    2453             : 
    2454             :         /* if offset is just for the log, but BAT is already sliced, reset offset */
    2455       50784 :         if (sliced)
    2456             :                 offset = 0;
    2457       50784 :         if (b->ttype == TYPE_msk) {
    2458           0 :                 BATiter bi = bat_iterator(b);
    2459           0 :                 if (offset % 32 == 0) {
    2460           0 :                         if (!mnstr_writeIntArray(lg->output_log, (int *) ((char *) bi.base + offset / 32), (size_t) ((nr + 31) / 32)))
    2461             :                                 ok = GDK_FAIL;
    2462             :                 } else {
    2463           0 :                         for (lng i = 0; i < nr; i += 32) {
    2464             :                                 uint32_t v = 0;
    2465           0 :                                 for (int j = 0; j < 32 && i + j < nr; j++)
    2466           0 :                                         v |= (uint32_t) Tmskval(&bi, (BUN) (offset + i + j)) << j;
    2467           0 :                                 if (!mnstr_writeInt(lg->output_log, (int) v)) {
    2468             :                                         ok = GDK_FAIL;
    2469             :                                         break;
    2470             :                                 }
    2471             :                         }
    2472             :                 }
    2473           0 :                 bat_iterator_end(&bi);
    2474       86087 :         } else if (b->ttype < TYPE_str && !isVIEW(b)) {
    2475       35303 :                 BATiter bi = bat_iterator(b);
    2476       35303 :                 const void *t = BUNtail(bi, (BUN)offset);
    2477             : 
    2478       35303 :                 ok = wt(t, lg->output_log, (size_t)nr);
    2479       35303 :                 bat_iterator_end(&bi);
    2480       15481 :         } else if (b->ttype == TYPE_str) {
    2481             :                 /* efficient string writes */
    2482       14847 :                 ok = string_writer(lg, b, offset, nr);
    2483             :         } else {
    2484         634 :                 BATiter bi = bat_iterator(b);
    2485         634 :                 BUN end = (BUN)(offset+nr);
    2486        1343 :                 for (p = (BUN)offset; p < end && ok == GDK_SUCCEED; p++) {
    2487         709 :                         const void *t = BUNtail(bi, p);
    2488             : 
    2489         709 :                         ok = wt(t, lg->output_log, 1);
    2490             :                 }
    2491         634 :                 bat_iterator_end(&bi);
    2492             :         }
    2493             : 
    2494       50784 :         if (lg->debug & 1)
    2495           0 :                 fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr);
    2496             : 
    2497       50784 :   bailout:
    2498       50784 :         if (ok != GDK_SUCCEED) {
    2499           0 :                 const char *err = mnstr_peek_error(lg->output_log);
    2500           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2501             :         }
    2502             :         return ok;
    2503             : }
    2504             : 
    2505             : /*
    2506             :  * Changes made to the BAT descriptor should be stored in the log
    2507             :  * files.  Actually, we need to save the descriptor file, perhaps we
    2508             :  * should simply introduce a versioning scheme.
    2509             :  */
    2510             : gdk_return
    2511       68251 : log_bat_persists(logger *lg, BAT *b, log_id id)
    2512             : {
    2513             :         logger_lock(lg);
    2514       68251 :         bte ta = find_type(lg, b->ttype);
    2515             :         logformat l;
    2516             : 
    2517       68251 :         if (logger_add_bat(lg, b, id) != GDK_SUCCEED) {
    2518             :                 logger_unlock(lg);
    2519           0 :                 return GDK_FAIL;
    2520             :         }
    2521             : 
    2522       68251 :         l.flag = LOG_CREATE;
    2523       68251 :         l.id = id;
    2524       68251 :         if (!LOG_DISABLED(lg)) {
    2525        6264 :                 if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2526        3132 :                     mnstr_write(lg->output_log, &ta, 1, 1) != 1) {
    2527             :                         logger_unlock(lg);
    2528           0 :                         return GDK_FAIL;
    2529             :                 }
    2530             :         }
    2531       68251 :         lg->end++;
    2532       68251 :         if (lg->debug & 1)
    2533           0 :                 fprintf(stderr, "#persists id (%d) bat (%d)\n", id, b->batCacheid);
    2534       68251 :         gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0);
    2535             :         logger_unlock(lg);
    2536       68251 :         return r;
    2537             : }
    2538             : 
    2539             : gdk_return
    2540       19789 : log_bat_transient(logger *lg, log_id id)
    2541             : {
    2542             :         logger_lock(lg);
    2543       19789 :         log_bid bid = internal_find_bat(lg, id);
    2544             :         logformat l;
    2545             : 
    2546       19789 :         l.flag = LOG_DESTROY;
    2547       19789 :         l.id = id;
    2548             : 
    2549       19789 :         if (!LOG_DISABLED(lg)) {
    2550       11421 :                 if (log_write_format(lg, &l) != GDK_SUCCEED) {
    2551           0 :                         TRC_CRITICAL(GDK, "write failed\n");
    2552             :                         logger_unlock(lg);
    2553           0 :                         return GDK_FAIL;
    2554             :                 }
    2555             :         }
    2556       19789 :         lg->end++;
    2557       19789 :         if (lg->debug & 1)
    2558           0 :                 fprintf(stderr, "#Logged destroyed bat (%d) %d\n", id,
    2559             :                                 bid);
    2560       19789 :         lg->end += BATcount(BBPquickdesc(bid));
    2561       19789 :         gdk_return r =  logger_del_bat(lg, bid);
    2562             :         logger_unlock(lg);
    2563       19789 :         return r;
    2564             : }
    2565             : 
    2566             : gdk_return
    2567      152129 : log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt)
    2568             : {
    2569             :         logger_lock(lg);
    2570      152129 :         gdk_return r = internal_log_bat(lg, b, id, offset, cnt, 0);
    2571             :         logger_unlock(lg);
    2572      152129 :         return r;
    2573             : }
    2574             : 
    2575             : gdk_return
    2576        2452 : log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
    2577             : {
    2578             :         logger_lock(lg);
    2579        2452 :         bte tpe = find_type(lg, uval->ttype);
    2580             :         gdk_return ok = GDK_SUCCEED;
    2581             :         logformat l;
    2582             :         BUN p;
    2583             :         lng nr;
    2584             : 
    2585        2452 :         if (BATtdense(uid)) {
    2586        2407 :                 ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1);
    2587             :                 logger_unlock(lg);
    2588        2407 :                 return ok;
    2589             :         }
    2590             : 
    2591          45 :         assert(uid->ttype == TYPE_oid || uid->ttype == TYPE_void);
    2592             : 
    2593          45 :         l.flag = LOG_UPDATE;
    2594          45 :         l.id = id;
    2595          45 :         nr = (BUNlast(uval));
    2596          45 :         assert(nr);
    2597             : 
    2598          45 :         lg->end += nr;
    2599          45 :         if (LOG_DISABLED(lg)) {
    2600             :                 /* logging is switched off */
    2601             :                 logger_unlock(lg);
    2602          24 :                 return GDK_SUCCEED;
    2603             :         }
    2604             : 
    2605          21 :         BATiter vi = bat_iterator(uval);
    2606          21 :         gdk_return (*wh) (const void *, stream *, size_t) = BATatoms[TYPE_oid].atomWrite;
    2607          21 :         gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[uval->ttype].atomWrite;
    2608             : 
    2609          42 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2610          42 :             !mnstr_writeLng(lg->output_log, nr) ||
    2611          21 :              mnstr_write(lg->output_log, &tpe, 1, 1) != 1){
    2612             :                 ok = GDK_FAIL;
    2613           0 :                 goto bailout;
    2614             :         }
    2615          90 :         for (p = 0; p < BUNlast(uid) && ok == GDK_SUCCEED; p++) {
    2616          69 :                 const oid id = BUNtoid(uid, p);
    2617             : 
    2618          69 :                 ok = wh(&id, lg->output_log, 1);
    2619             :         }
    2620          21 :         if (uval->ttype == TYPE_msk) {
    2621           0 :                 if (!mnstr_writeIntArray(lg->output_log, vi.base, (BUNlast(uval) + 31) / 32))
    2622             :                         ok = GDK_FAIL;
    2623          21 :         } else if (uval->ttype == TYPE_str) {
    2624             :                 /* efficient string writes */
    2625           5 :                 ok = string_writer(lg, uval, 0, nr);
    2626             :         } else {
    2627          74 :                 for (p = 0; p < BUNlast(uid) && ok == GDK_SUCCEED; p++) {
    2628          58 :                         const void *val = BUNtail(vi, p);
    2629             : 
    2630          58 :                         ok = wt(val, lg->output_log, 1);
    2631             :                 }
    2632             :         }
    2633             : 
    2634          21 :         if (lg->debug & 1)
    2635           0 :                 fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr);
    2636             : 
    2637          21 :   bailout:
    2638          21 :         bat_iterator_end(&vi);
    2639          21 :         if (ok != GDK_SUCCEED) {
    2640           0 :                 const char *err = mnstr_peek_error(lg->output_log);
    2641           0 :                 TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
    2642             :         }
    2643             :         logger_unlock(lg);
    2644          21 :         return ok;
    2645             : }
    2646             : 
    2647             : 
    2648             : gdk_return
    2649           0 : log_bat_clear(logger *lg, int id)
    2650             : {
    2651             :         logformat l;
    2652             : 
    2653           0 :         lg->end++;
    2654           0 :         if (LOG_DISABLED(lg)) {
    2655             :                 logger_lock(lg);
    2656           0 :                 gdk_return res = la_bat_update_count(lg, id, 0);
    2657             :                 logger_unlock(lg);
    2658           0 :                 return res;
    2659             :         }
    2660             : 
    2661           0 :         l.flag = LOG_CLEAR;
    2662           0 :         l.id = id;
    2663             : 
    2664           0 :         if (lg->debug & 1)
    2665           0 :                 fprintf(stderr, "#Logged clear %d\n", id);
    2666           0 :         return log_write_format(lg, &l);
    2667             : }
    2668             : 
    2669             : #define DBLKSZ          8192
    2670             : #define SEGSZ           (64*DBLKSZ)
    2671             : 
    2672             : #define LOG_MINI        (LL_CONSTANT(2)*1024)
    2673             : #define LOG_LARGE       (LL_CONSTANT(2)*1024*1024*1024)
    2674             : 
    2675             : static gdk_return
    2676       12825 : new_logfile(logger *lg)
    2677             : {
    2678       12825 :         lng log_large = (GDKdebug & FORCEMITOMASK)?LOG_MINI:LOG_LARGE;
    2679       12825 :         assert(!LOG_DISABLED(lg));
    2680             :         lng p;
    2681       12825 :         p = (lng) getfilepos(getFile(lg->output_log));
    2682       12825 :         if (p == -1)
    2683             :                 return GDK_FAIL;
    2684       12825 :         if (p > log_large || (lg->end*1024) > log_large) {
    2685        8178 :                 lg->id++;
    2686        8178 :                 logger_close_output(lg);
    2687        8178 :                 return logger_open_output(lg);
    2688             :         }
    2689             :         return GDK_SUCCEED;
    2690             : }
    2691             : 
    2692             : gdk_return
    2693       16954 : log_tend(logger *lg)
    2694             : {
    2695             :         logformat l;
    2696             : 
    2697       16954 :         if (lg->debug & 1)
    2698           0 :                 fprintf(stderr, "#log_tend %d\n", lg->tid);
    2699             : 
    2700       16954 :         l.flag = LOG_END;
    2701       16954 :         l.id = lg->tid;
    2702       16954 :         if (lg->flushnow) {
    2703        4126 :                 lg->flushnow = 0;
    2704        4126 :                 return logger_commit(lg);
    2705             :         }
    2706             : 
    2707       12828 :         lg->end++;
    2708       12828 :         if (LOG_DISABLED(lg)) {
    2709             :                 return GDK_SUCCEED;
    2710             :         }
    2711             : 
    2712       25650 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2713       12825 :             mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) ||
    2714       25650 :             (!(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->output_log)) ||
    2715       12825 :             new_logfile(lg) != GDK_SUCCEED) {
    2716           0 :                 TRC_CRITICAL(GDK, "write failed\n");
    2717           0 :                 return GDK_FAIL;
    2718             :         }
    2719             :         return GDK_SUCCEED;
    2720             : }
    2721             : 
    2722             : gdk_return
    2723       16954 : log_tdone(logger *lg, ulng commit_ts)
    2724             : {
    2725       16954 :         if (lg->debug & 1)
    2726           0 :                 fprintf(stderr, "#log_tdone %d\n", lg->tid);
    2727             : 
    2728       16954 :         if (lg->current) {
    2729       16954 :                 lg->current->last_tid = lg->tid;
    2730       16954 :                 lg->current->last_ts = commit_ts;
    2731             :         }
    2732       16954 :         return GDK_SUCCEED;
    2733             : }
    2734             : 
    2735             : static gdk_return
    2736        6238 : log_sequence_(logger *lg, int seq, lng val, int flush)
    2737             : {
    2738             :         logformat l;
    2739             : 
    2740        6238 :         if (LOG_DISABLED(lg))
    2741             :                 return GDK_SUCCEED;
    2742        2734 :         l.flag = LOG_SEQ;
    2743        2734 :         l.id = seq;
    2744             : 
    2745        2734 :         if (lg->debug & 1)
    2746           0 :                 fprintf(stderr, "#log_sequence_ (%d," LLFMT ")\n", seq, val);
    2747             : 
    2748        5468 :         if (log_write_format(lg, &l) != GDK_SUCCEED ||
    2749        5468 :             !mnstr_writeLng(lg->output_log, val) ||
    2750        2734 :             (flush && mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA)) ||
    2751        2734 :             (flush && !(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->output_log))) {
    2752           0 :                 TRC_CRITICAL(GDK, "write failed\n");
    2753           0 :                 return GDK_FAIL;
    2754             :         }
    2755             :         return GDK_SUCCEED;
    2756             : }
    2757             : 
    2758             : /* a transaction in it self */
    2759             : gdk_return
    2760        6238 : log_sequence(logger *lg, int seq, lng val)
    2761             : {
    2762             :         BUN p;
    2763             : 
    2764        6238 :         if (lg->debug & 1)
    2765           0 :                 fprintf(stderr, "#log_sequence (%d," LLFMT ")\n", seq, val);
    2766             : 
    2767             :         logger_lock(lg);
    2768        6238 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
    2769        5902 :             p >= lg->seqs_id->batInserted) {
    2770        1635 :                 assert(lg->seqs_val->hseqbase == 0);
    2771        1635 :                 if (BUNreplace(lg->seqs_val, p, &val, false) != GDK_SUCCEED) {
    2772             :                         logger_unlock(lg);
    2773           0 :                         return GDK_FAIL;
    2774             :                 }
    2775             :         } else {
    2776        4603 :                 if (p != BUN_NONE) {
    2777        4267 :                         oid pos = p;
    2778        4267 :                         if (BUNappend(lg->dseqs, &pos, false) != GDK_SUCCEED) {
    2779             :                                 logger_unlock(lg);
    2780           0 :                                 return GDK_FAIL;
    2781             :                         }
    2782             :                 }
    2783        9206 :                 if (BUNappend(lg->seqs_id, &seq, false) != GDK_SUCCEED ||
    2784        4603 :                     BUNappend(lg->seqs_val, &val, false) != GDK_SUCCEED) {
    2785             :                         logger_unlock(lg);
    2786           0 :                         return GDK_FAIL;
    2787             :                 }
    2788             :         }
    2789        6238 :         gdk_return r = log_sequence_(lg, seq, val, 1);
    2790             :         logger_unlock(lg);
    2791        6238 :         return r;
    2792             : }
    2793             : 
    2794             : static gdk_return
    2795       15600 : bm_commit(logger *lg)
    2796             : {
    2797             :         BUN p;
    2798             :         logger_lock(lg);
    2799       15600 :         BAT *b = lg->catalog_bid;
    2800             :         const log_bid *bids;
    2801             : 
    2802       15600 :         bids = (log_bid *) Tloc(b, 0);
    2803       83769 :         for (p = b->batInserted; p < BUNlast(b); p++) {
    2804       68169 :                 log_bid bid = bids[p];
    2805             :                 BAT *lb;
    2806             : 
    2807       68169 :                 assert(bid);
    2808      136338 :                 if ((lb = BATdescriptor(bid)) == NULL ||
    2809       68169 :                     BATmode(lb, false) != GDK_SUCCEED) {
    2810           0 :                         TRC_WARNING(GDK, "Failed to set bat (%d%s) persistent\n", bid, !lb?" gone":"");
    2811             :                         logbat_destroy(lb);
    2812             :                         logger_unlock(lg);
    2813           0 :                         return GDK_FAIL;
    2814             :                 }
    2815             : 
    2816       68169 :                 assert(lb->batRestricted != BAT_WRITE);
    2817             :                 logbat_destroy(lb);
    2818             : 
    2819       68169 :                 if (lg->debug & 1)
    2820           0 :                         fprintf(stderr, "#bm_commit: create %d (%d)\n",
    2821           0 :                                 bid, BBP_lrefs(bid));
    2822             :         }
    2823             :         logger_unlock(lg);
    2824       15600 :         return bm_subcommit(lg);
    2825             : }
    2826             : 
    2827             : static gdk_return
    2828       68483 : logger_add_bat(logger *lg, BAT *b, log_id id)
    2829             : {
    2830       68483 :         log_bid bid = internal_find_bat(lg, id);
    2831       68483 :         lng cnt = 0;
    2832       68483 :         lng lid = lng_nil;
    2833             : 
    2834       68483 :         assert(b->batRestricted != BAT_WRITE ||
    2835             :                b == lg->catalog_bid ||
    2836             :                b == lg->catalog_id ||
    2837             :                b == lg->dcatalog ||
    2838             :                b == lg->seqs_id ||
    2839             :                b == lg->seqs_val ||
    2840             :                b == lg->dseqs);
    2841       68483 :         assert(b->batRole == PERSISTENT);
    2842       68483 :         if (bid) {
    2843         586 :                 if (bid != b->batCacheid) {
    2844         584 :                         if (logger_del_bat(lg, bid) != GDK_SUCCEED)
    2845             :                                 return GDK_FAIL;
    2846             :                 } else {
    2847             :                         return GDK_SUCCEED;
    2848             :                 }
    2849             :         }
    2850       68481 :         bid = b->batCacheid;
    2851       68481 :         if (lg->debug & 1)
    2852           0 :                 fprintf(stderr, "#create %d\n", id);
    2853       68481 :         assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
    2854      136962 :         if (BUNappend(lg->catalog_bid, &bid, false) != GDK_SUCCEED ||
    2855      136962 :             BUNappend(lg->catalog_id, &id, false) != GDK_SUCCEED ||
    2856      136962 :             BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED ||
    2857       68481 :             BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
    2858           0 :                 return GDK_FAIL;
    2859       68481 :         lg->cnt++;
    2860       68481 :         BBPretain(bid);
    2861       68481 :         return GDK_SUCCEED;
    2862             : }
    2863             : 
    2864             : static gdk_return
    2865       20409 : logger_del_bat(logger *lg, log_bid bid)
    2866             : {
    2867       20409 :         BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid);
    2868             :         oid pos;
    2869       20409 :         lng lid = lg->tid;
    2870             : 
    2871       20409 :         assert(p != BUN_NONE);
    2872             :         if (p == BUN_NONE) {
    2873             :                 GDKerror("cannot find BAT\n");
    2874             :                 return GDK_FAIL;
    2875             :         }
    2876             : 
    2877       20409 :         pos = (oid) p;
    2878       20409 :         assert(lg->catalog_lid->hseqbase == 0);
    2879       20409 :         if (BUNreplace(lg->catalog_lid, p, &lid, false) != GDK_SUCCEED)
    2880             :                 return GDK_FAIL;
    2881       20409 :         if (BUNappend(lg->dcatalog, &pos, false) == GDK_SUCCEED) {
    2882       20409 :                 lg->deleted++;
    2883       20409 :                 return GDK_SUCCEED;
    2884             :         }
    2885             :         return GDK_FAIL;
    2886             : }
    2887             : 
    2888             : log_bid
    2889       20793 : logger_find_bat(logger *lg, log_id id)
    2890             : {
    2891             :         logger_lock(lg);
    2892       20793 :         log_bid bid = internal_find_bat(lg, id);
    2893             :         logger_unlock(lg);
    2894       20793 :         return bid;
    2895             : }
    2896             : 
    2897             : 
    2898             : gdk_return
    2899       16954 : log_tstart(logger *lg, bool flushnow)
    2900             : {
    2901             :         logformat l;
    2902             : 
    2903       16954 :         if (flushnow) {
    2904        4126 :                 lg->id++;
    2905        4126 :                 logger_close_output(lg);
    2906             :                 /* start new file */
    2907        4126 :                 if (logger_open_output(lg) != GDK_SUCCEED)
    2908             :                         return GDK_FAIL;
    2909       14047 :                 while (lg->saved_id+1 < lg->id)
    2910        9921 :                         logger_flush(lg, (1ULL<<63));
    2911        4126 :                 lg->flushnow = flushnow;
    2912             :         }
    2913             : 
    2914       16954 :         lg->end++;
    2915       16954 :         if (LOG_DISABLED(lg)) {
    2916             :                 return GDK_SUCCEED;
    2917             :         }
    2918             : 
    2919       12825 :         l.flag = LOG_START;
    2920       12825 :         l.id = ++lg->tid;
    2921             : 
    2922       12825 :         if (lg->debug & 1)
    2923           0 :                 fprintf(stderr, "#log_tstart %d\n", lg->tid);
    2924       12825 :         if (log_write_format(lg, &l) != GDK_SUCCEED)
    2925           0 :                 return GDK_FAIL;
    2926             :         return GDK_SUCCEED;
    2927             : }

Generated by: LCOV version 1.14