LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1839 2187 84.1 %
Date: 2021-10-13 02:24:04 Functions: 143 151 94.7 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "bat_storage.h"
      11             : #include "bat_utils.h"
      12             : #include "sql_string.h"
      13             : #include "gdk_atoms.h"
      14             : #include "gdk_atoms.h"
      15             : #include "matomic.h"
      16             : 
      17             : #define inTransaction(tr,t) (isLocalTemp(t))
      18             : 
      19             : static int log_update_col( sql_trans *tr, sql_change *c);
      20             : static int log_update_idx( sql_trans *tr, sql_change *c);
      21             : static int log_update_del( sql_trans *tr, sql_change *c);
      22             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      23             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      24             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      25             : static int log_create_col(sql_trans *tr, sql_change *change);
      26             : static int log_create_idx(sql_trans *tr, sql_change *change);
      27             : static int log_create_del(sql_trans *tr, sql_change *change);
      28             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      29             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      30             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      31             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      32             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      33             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      34             : static int tc_gc_drop_col( sql_store Store, sql_change *c, ulng oldest);
      35             : static int tc_gc_drop_idx( sql_store Store, sql_change *c, ulng oldest);
      36             : 
      37             : static int merge_delta( sql_delta *obat);
      38             : 
      39             : /* valid
      40             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      41             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      42             :  */
      43             : 
      44             : #define VALID_4_READ(TS,tr) \
      45             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      46             : 
      47             : /* when changed, check if the old status is still valid */
      48             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      49             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      50             : 
      51             : #define SEG_VALID_4_DELETE(seg,tr) \
      52             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      53             : 
      54             : /* Delete (in current trans or by some other finised transaction, or re-used segment which used to be deleted */
      55             : #define SEG_IS_DELETED(seg,tr) \
      56             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      57             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      58             : 
      59             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      60             : #define SEG_IS_VALID(seg, tr) \
      61             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      62             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      63             : 
      64             : static void
      65             : lock_table(sqlstore *store, sqlid id)
      66             : {
      67    12576437 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
      68     1743295 : }
      69             : 
      70             : static void
      71             : unlock_table(sqlstore *store, sqlid id)
      72             : {
      73    12581616 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
      74     1708261 : }
      75             : 
      76             : static void
      77             : lock_column(sqlstore *store, sqlid id)
      78             : {
      79    14301297 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
      80      247144 : }
      81             : 
      82             : static void
      83             : unlock_column(sqlstore *store, sqlid id)
      84             : {
      85    14300934 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
      86        3366 : }
      87             : 
      88             : 
      89             : static int
      90        5288 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
      91             : {
      92             :         (void)Store;
      93        5288 :         segment *s = change->data;
      94             : 
      95        5288 :         if (s->ts <= oldest) {
      96        6444 :                 while(s) {
      97        3807 :                         segment *n = s->prev;
      98        3807 :                         _DELETE(s);
      99             :                         s = n;
     100             :                 }
     101             :                 return 1;
     102             :         }
     103             :         return LOG_OK;
     104             : }
     105             : 
     106             : static void
     107             : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     108             : {
     109             :         /* we can only be accessed by anything older then commit_ts */
     110        3807 :         if (c->cleanup == &tc_gc_seg)
     111        1170 :                 s->prev = c->data;
     112             :         else
     113        2637 :                 c->cleanup = &tc_gc_seg;
     114        3807 :         c->data = s;
     115        3807 :         s->ts = commit_ts;
     116          17 : }
     117             : 
     118             : static segment *
     119       35070 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     120             : {
     121       35070 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     122             : 
     123       35070 :         assert(tr);
     124       35070 :         if (n) {
     125       35070 :                 n->ts = tr->tid;
     126       35070 :                 n->oldts = 0;
     127       35070 :                 n->deleted = false;
     128       35070 :                 n->start = 0;
     129       35070 :                 n->next = NULL;
     130       35070 :                 n->prev = NULL;
     131       35070 :                 if (o) {
     132       16112 :                         n->start = o->end;
     133       16112 :                         o->next = n;
     134             :                 }
     135       35070 :                 n->end = n->start + cnt;
     136             :         }
     137       35070 :         return n;
     138             : }
     139             : 
     140             : static segment *
     141       86832 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     142             : {
     143       86832 :         if (o->start == start && o->end == start+cnt) {
     144        8917 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     145        8917 :                 o->oldts = o->ts;
     146        8917 :                 o->ts = tr->tid;
     147        8917 :                 o->deleted = deleted;
     148        8917 :                 return o;
     149             :         }
     150       77915 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     151             : 
     152       77915 :         assert(tr);
     153       77915 :         if (!n)
     154             :                 return NULL;
     155       77915 :         n->prev = NULL;
     156             : 
     157       77915 :         n->oldts = 0;
     158       77915 :         if (o->ts == tr->tid) {
     159        3833 :                 n->ts = 1;
     160        3833 :                 n->deleted = true;
     161             :         } else {
     162       74082 :                 n->oldts = o->ts;
     163       74082 :                 n->ts = tr->tid;
     164       74082 :                 n->deleted = deleted;
     165             :         }
     166       77915 :         if (start == o->start) {
     167       63614 :                 n->start = o->start;
     168       63614 :                 n->end = n->start + cnt;
     169       63614 :                 n->next = o;
     170       63614 :                 if (segs->h == o)
     171         386 :                         segs->h = n;
     172       63614 :                 if (p)
     173       63228 :                         p->next = n;
     174       63614 :                 o->start = n->end;
     175       63614 :                 return n;
     176       14301 :         } else if (start+cnt == o->end) {
     177        5374 :                 n->start = o->end - cnt;
     178        5374 :                 n->end = o->end;
     179        5374 :                 n->next = o->next;
     180        5374 :                 o->next = n;
     181        5374 :                 if (segs->t == o)
     182         712 :                         segs->t = n;
     183        5374 :                 o->end = n->start;
     184        5374 :                 return n;
     185             :         }
     186             :         /* 3 way split */
     187        8927 :         n->start = start;
     188        8927 :         n->end = o->end;
     189        8927 :         n->next = o->next;
     190        8927 :         o->next = n;
     191        8927 :         if (segs->t == o)
     192        3232 :                 segs->t = n;
     193        8927 :         o->end = n->start;
     194             : 
     195             :         segment *oo = o;
     196             :         o = n;
     197        8927 :         n = (segment*)GDKmalloc(sizeof(segment));
     198        8927 :         if (!n)
     199             :                 return NULL;
     200        8927 :         n->prev = NULL;
     201        8927 :         n->ts = oo->ts;
     202        8927 :         n->oldts = oo->oldts;
     203        8927 :         n->deleted = oo->deleted;
     204        8927 :         n->start = start+cnt;
     205        8927 :         n->end = o->end;
     206        8927 :         n->next = o->next;
     207        8927 :         o->next = n;
     208        8927 :         if (segs->t == o)
     209        3232 :                 segs->t = n;
     210        8927 :         o->end = n->start;
     211        8927 :         return o;
     212             : }
     213             : 
     214             : static void
     215        3058 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     216             : {
     217        3058 :         segment *cur = segs->h, *seg = NULL;
     218       13143 :         for (; cur; cur = cur->next) {
     219       10085 :                 if (cur->ts == tr->tid) { /* revert */
     220        3496 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     221        3496 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     222        3496 :                         cur->oldts = 0;
     223             :                 }
     224       10085 :                 if (cur->ts <= oldest) { /* possibly merge range */
     225       10026 :                         if (!seg) { /* skip first */
     226             :                                 seg = cur;
     227        6968 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     228             :                                 /* merge with previous */
     229        3790 :                                 seg->end = cur->end;
     230        3790 :                                 seg->next = cur->next;
     231        3790 :                                 if (cur == segs->t)
     232        2577 :                                         segs->t = seg;
     233        3790 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     234        3790 :                                 cur = seg;
     235             :                         } else {
     236             :                                 seg = cur; /* begin of new merge */
     237             :                         }
     238             :                 }
     239             :         }
     240        3058 : }
     241             : 
     242             : static size_t
     243       45361 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     244             : {
     245             :         size_t cnt = 0;
     246       45361 :         segment *s = segs->h, *l = NULL;
     247             : 
     248      224932 :         for(;s; s = s->next) {
     249      179571 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     250             :                                 l = s;
     251             :         }
     252       45361 :         if (l)
     253       45354 :                 cnt = l->end;
     254       45361 :         return cnt;
     255             : }
     256             : 
     257             : static int
     258       45361 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     259             : {
     260             :         /* set bits correctly */
     261       45361 :         BAT *b = temp_descriptor(cs->bid);
     262             : 
     263       45361 :         if (!b)
     264             :                 return LOG_ERR;
     265       45361 :         segment *s = segs->h;
     266             : 
     267       45361 :         size_t nr = segs_end_include_deleted(segs, tr);
     268       45361 :         size_t rounded_nr = ((nr+31)&~31);
     269       45361 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     270           0 :                 bat_destroy(b);
     271           0 :                 return LOG_ERR;
     272             :         }
     273             : 
     274             :         /* disable all properties here */
     275       45361 :         b->tsorted = false;
     276       45361 :         b->trevsorted = false;
     277       45361 :         b->tnosorted = 0;
     278       45361 :         b->tnorevsorted = 0;
     279       45361 :         b->tseqbase = oid_nil;
     280       45361 :         b->tkey = false;
     281       45361 :         b->tnokey[0] = 0;
     282       45361 :         b->tnokey[1] = 0;
     283             : 
     284             :         uint32_t *restrict dst;
     285      202932 :         for (; s ; s=s->next) {
     286      179496 :                 if (s->start >= nr)
     287             :                         break;
     288      157571 :                 if (s->ts == tr->tid && s->end != s->start) {
     289       98590 :                         b->batDirtydesc = true;
     290       98590 :                         b->theap->dirty = true;
     291       98590 :                         size_t lnr = s->end-s->start;
     292             :                         size_t pos = s->start;
     293       98590 :                         dst = (uint32_t *) Tloc(b, 0) + (s->start/32);
     294             :                         uint32_t cur = 0;
     295       98590 :                         if (s->deleted) {
     296       69731 :                                 size_t used = pos&31, end = 32;
     297       69731 :                                 if (used) {
     298       66408 :                                         if (lnr < (32-used))
     299       64692 :                                                 end = used + lnr;
     300      133520 :                                         for(size_t j=used; j < end; j++, lnr--)
     301       67112 :                                                 cur |= 1U<<j;
     302       66408 :                                         *dst++ |= cur;
     303             :                                         cur = 0;
     304             :                                 }
     305       69731 :                                 size_t full = lnr/32;
     306       69731 :                                 size_t rest = lnr%32;
     307       71144 :                                 for(size_t i = 0; i<full; i++, lnr-=32)
     308        1413 :                                         *dst++ = ~0;
     309       69731 :                                 if (rest) {
     310        7609 :                                         for(size_t j=0; j < rest; j++, lnr--)
     311        4256 :                                                 cur |= 1U<<j;
     312        3353 :                                         *dst |= cur;
     313             :                                 }
     314       69731 :                                 assert(lnr==0);
     315             :                         } else {
     316       28859 :                                 size_t used = pos&31, end = 32;
     317       28859 :                                 if (used) {
     318       20363 :                                         if (lnr < (32-used))
     319       17928 :                                                 end = used + lnr;
     320       91600 :                                         for(size_t j=used; j < end; j++, lnr--)
     321       71237 :                                                 cur |= 1U<<j;
     322       20363 :                                         *dst++ &= ~cur;
     323             :                                         cur = 0;
     324             :                                 }
     325       28859 :                                 size_t full = lnr/32;
     326       28859 :                                 size_t rest = lnr%32;
     327     6808040 :                                 for(size_t i = 0; i<full; i++, lnr-=32)
     328     6779181 :                                         *dst++ = 0;
     329       28859 :                                 if (rest) {
     330       80389 :                                         for(size_t j=0; j < rest; j++, lnr--)
     331       70138 :                                                 cur |= 1U<<j;
     332       10251 :                                         *dst &= ~cur;
     333             :                                 }
     334       28859 :                                 assert(lnr==0);
     335             :                         }
     336             :                 }
     337             :         }
     338       45361 :         if (nr > BATcount(b))
     339       19664 :                 BATsetcount(b, nr);
     340             : 
     341       45361 :         bat_destroy(b);
     342       45361 :         return LOG_OK;
     343             : }
     344             : 
     345             : /* TODO return LOG_OK/ERR */
     346             : static void
     347       45387 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     348             : {
     349       45387 :         segment *cur = s->segs->h, *seg = NULL;
     350      225030 :         for (; cur; cur = cur->next) {
     351      179643 :                 if (cur->ts == tr->tid) {
     352      105344 :                         if (!cur->deleted)
     353       35613 :                                 cur->oldts = 0;
     354      105344 :                         cur->ts = commit_ts;
     355             :                 }
     356      179643 :                 if (cur->ts <= oldest && cur->ts < TRANSACTION_ID_BASE) { /* possibly merge range */
     357      179316 :                         if (!seg) { /* skip first */
     358             :                                 seg = cur;
     359      133946 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     360             :                                 /* merge with previous */
     361       91517 :                                 seg->end = cur->end;
     362       91517 :                                 seg->next = cur->next;
     363       91517 :                                 if (cur == s->segs->t)
     364       21554 :                                         s->segs->t = seg;
     365       91517 :                                 if (commit_ts == oldest)
     366       91500 :                                         _DELETE(cur);
     367             :                                 else
     368             :                                         mark4destroy(cur, change, commit_ts);
     369             :                                 cur = seg;
     370             :                         } else {
     371             :                                 seg = cur; /* begin of new merge */
     372             :                         }
     373             :                 }
     374             :         }
     375       45387 : }
     376             : 
     377             : static int
     378     1862373 : segments_in_transaction(sql_trans *tr, sql_table *t)
     379             : {
     380     1862373 :         storage *s = ATOMIC_PTR_GET(&t->data);
     381     1862373 :         segment *seg = s->segs->h;
     382             : 
     383     1862373 :         if (seg && s->segs->t->ts == tr->tid)
     384             :                 return 1;
     385      336592 :         for (; seg ; seg=seg->next) {
     386      298286 :                 if (seg->ts == tr->tid)
     387             :                         return 1;
     388             :         }
     389             :         return 0;
     390             : }
     391             : 
     392             : static size_t
     393     9753065 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     394             : {
     395             :         size_t cnt = 0;
     396             : 
     397     9753065 :         lock_table(tr->store, table->base.id);
     398     9758390 :         segment *s = segs->h, *l = NULL;
     399             : 
     400     9758390 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     401     8767541 :                 l = s = segs->t;
     402             : 
     403   188478926 :         for(;s; s = s->next) {
     404   178720682 :                 if (SEG_IS_VALID(s, tr))
     405             :                                 l = s;
     406             :         }
     407     9758244 :         if (l)
     408     9744617 :                 cnt = l->end;
     409     9758244 :         unlock_table(tr->store, table->base.id);
     410     9758217 :         return cnt;
     411             : }
     412             : 
     413             : static segments *
     414       18957 : new_segments(sql_trans *tr, size_t cnt)
     415             : {
     416       18957 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     417             : 
     418       18958 :         if (n) {
     419       18958 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     420       18958 :                 if (!n->h) {
     421           0 :                         GDKfree(n);
     422           0 :                         return NULL;
     423             :                 }
     424       18958 :                 sql_ref_init(&n->r);
     425             :         }
     426             :         return n;
     427             : }
     428             : 
     429             : static segments*
     430             : dup_segments(segments *s)
     431             : {
     432           0 :         sql_ref_inc(&s->r);
     433             :         return s;
     434             : }
     435             : 
     436             : static int
     437       19084 : temp_dup_cs(column_storage *cs, ulng tid, int type)
     438             : {
     439       19084 :         BAT *b = bat_new(type, 1024, TRANSIENT);
     440       19084 :         if (!b)
     441             :                 return LOG_ERR;
     442       19084 :         cs->bid = temp_create(b);
     443       19084 :         bat_destroy(b);
     444       19083 :         cs->uibid = e_bat(TYPE_oid);
     445       19084 :         cs->uvbid = e_bat(type);
     446       19083 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
     447             :                 return LOG_ERR;
     448       19083 :         cs->ucnt = 0;
     449       19083 :         cs->cleared = 0;
     450       19083 :         cs->ts = tid;
     451       19083 :         cs->refcnt = 1;
     452       19083 :         return LOG_OK;
     453             : }
     454             : 
     455             : static sql_delta *
     456       16943 : temp_dup_delta(ulng tid, int type)
     457             : {
     458       16943 :         sql_delta *bat = ZNEW(sql_delta);
     459             : 
     460       16943 :         if (!bat)
     461             :                 return NULL;
     462       16943 :         if (temp_dup_cs(&bat->cs, tid, type)) {
     463           0 :                 _DELETE(bat);
     464           0 :                 return NULL;
     465             :         }
     466             :         return bat;
     467             : }
     468             : 
     469             : static sql_delta *
     470             : temp_delta(sql_delta *d, ulng tid)
     471             : {
     472      303397 :         while (d && d->cs.ts != tid)
     473          69 :                 d = d->next;
     474             :         return d;
     475             : }
     476             : 
     477             : static storage *
     478        2141 : temp_dup_storage(sql_trans *tr)
     479             : {
     480        2141 :         storage *bat = ZNEW(storage);
     481             : 
     482        2141 :         if (!bat)
     483             :                 return NULL;
     484        2141 :         if (temp_dup_cs(&bat->cs, tr->tid, TYPE_msk)) {
     485           0 :                 _DELETE(bat);
     486           0 :                 return NULL;
     487             :         }
     488        2140 :         if (!(bat->segs = new_segments(tr, 0))) {
     489           0 :                 _DELETE(bat);
     490           0 :                 return NULL;
     491             :         }
     492             :         return bat;
     493             : }
     494             : 
     495             : static storage *
     496             : temp_storage(storage *d, ulng tid)
     497             : {
     498      181254 :         while (d && d->cs.ts != tid)
     499          61 :                 d = d->next;
     500             :         return d;
     501             : }
     502             : 
     503             : static sql_delta *
     504    19340336 : timestamp_delta( sql_trans *tr, sql_delta *d)
     505             : {
     506    19340548 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     507         212 :                 d = d->next;
     508    19340427 :         return d;
     509             : }
     510             : 
     511             : static sql_delta *
     512      303328 : temp_col_timestamp_delta( sql_trans *tr, sql_column *c)
     513             : {
     514      303328 :         assert(isTempTable(c->t));
     515      303328 :         sql_delta *d = temp_delta(ATOMIC_PTR_GET(&c->data), tr->tid);
     516      303328 :         if (!d) {
     517       16943 :                 if (!(d = temp_dup_delta(tr->tid, c->type.type->localtype)))
     518             :                         return NULL;
     519             :                 do {
     520       16943 :                         d->next = ATOMIC_PTR_GET(&c->data);
     521       16943 :                 } while(!ATOMIC_PTR_CAS(&c->data, (void**)&d->next, d)); /* set c->data = d, when c->data == d->next else d->next = c->data */
     522             :         }
     523             :         return d;
     524             : }
     525             : 
     526             : static sql_delta *
     527    19606127 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     528             : {
     529    19606127 :         if (isTempTable(c->t))
     530      302069 :                 return temp_col_timestamp_delta(tr, c);
     531    19304058 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     532             : }
     533             : 
     534             : static sql_delta *
     535           0 : temp_idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     536             : {
     537           0 :         assert(isTempTable(i->t));
     538           0 :         sql_delta *d = temp_delta(ATOMIC_PTR_GET(&i->data), tr->tid);
     539           0 :         if (!d) {
     540           0 :                 int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     541             : 
     542           0 :                 if (!(d = temp_dup_delta(tr->tid, type)))
     543             :                         return NULL;
     544             :                 do {
     545           0 :                         d->next = ATOMIC_PTR_GET(&i->data);
     546           0 :                 } while(!ATOMIC_PTR_CAS(&i->data, (void**)&d->next, d)); /* set i->data = d, when i->data == d->next else d->next = i->data */
     547             :         }
     548             :         return d;
     549             : }
     550             : 
     551             : static sql_delta *
     552       34099 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     553             : {
     554       34099 :         if (isTempTable(i->t))
     555           0 :                 return temp_idx_timestamp_delta(tr, i);
     556       34099 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     557             : }
     558             : 
     559             : static storage *
     560    10589237 : timestamp_storage( sql_trans *tr, storage *d)
     561             : {
     562    10589237 :         if (!d)
     563             :                 return NULL;
     564    10589301 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     565          64 :                 d = d->next;
     566             :         return d;
     567             : }
     568             : 
     569             : static storage *
     570      181193 : temp_tab_timestamp_storage( sql_trans *tr, sql_table *t)
     571             : {
     572      181193 :         assert(isTempTable(t));
     573      181193 :         storage *d = temp_storage(ATOMIC_PTR_GET(&t->data), tr->tid);
     574      181193 :         if (!d) {
     575        2141 :                 if (!(d = temp_dup_storage(tr)))
     576             :                         return NULL;
     577             :                 do {
     578        2141 :                         d->next = ATOMIC_PTR_GET(&t->data);
     579        2141 :                 } while(!ATOMIC_PTR_CAS(&t->data, (void**)&d->next, d)); /* set t->data = d, when t->data == d->next else d->next = t->data */
     580             :         }
     581             :         return d;
     582             : }
     583             : 
     584             : static storage *
     585    10770365 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     586             : {
     587    10770365 :         if (isTempTable(t))
     588      180886 :                 return temp_tab_timestamp_storage(tr, t);
     589    10589479 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     590             : }
     591             : 
     592             : static sql_delta*
     593             : delta_dup(sql_delta *d)
     594             : {
     595       20174 :         d->cs.refcnt++;
     596             :         return d;
     597             : }
     598             : 
     599             : static void *
     600       18830 : col_dup(sql_column *c)
     601             : {
     602       18830 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     603             : }
     604             : 
     605             : static void *
     606        2631 : idx_dup(sql_idx *i)
     607             : {
     608        2631 :         if (!ATOMIC_PTR_GET(&i->data))
     609             :                 return NULL;
     610        1344 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     611             : }
     612             : 
     613             : static storage*
     614             : storage_dup(storage *d)
     615             : {
     616        1606 :         d->cs.refcnt++;
     617             :         return d;
     618             : }
     619             : 
     620             : static void *
     621        1606 : del_dup(sql_table *t)
     622             : {
     623        1606 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     624             : }
     625             : 
     626             : static size_t
     627           0 : count_inserts( segment *s, sql_trans *tr)
     628             : {
     629             :         size_t cnt = 0;
     630             : 
     631           0 :         for(;s; s = s->next) {
     632           0 :                 if (!s->deleted && s->ts == tr->tid)
     633           0 :                         cnt += s->end - s->start;
     634             :         }
     635           0 :         return cnt;
     636             : }
     637             : 
     638             : static size_t
     639         673 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     640             : {
     641             :         size_t cnt = 0;
     642             : 
     643         820 :         for(;s && s->end <= start; s = s->next)
     644             :                 ;
     645             : 
     646        1483 :         for(;s && s->start < end; s = s->next) {
     647         810 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     648         150 :                         cnt += s->end - s->start;
     649             :         }
     650         673 :         return cnt;
     651             : }
     652             : 
     653             : static size_t
     654           0 : count_deletes( segment *s, sql_trans *tr)
     655             : {
     656             :         size_t cnt = 0;
     657             : 
     658           0 :         for(;s; s = s->next) {
     659           0 :                 if (SEG_IS_DELETED(s, tr))
     660           0 :                         cnt += s->end - s->start;
     661             :         }
     662           0 :         return cnt;
     663             : }
     664             : 
     665             : #define CNT_ACTIVE 10
     666             : 
     667             : static size_t
     668     9792898 : count_col(sql_trans *tr, sql_column *c, int access)
     669             : {
     670             :         storage *d;
     671             :         sql_delta *ds;
     672             : 
     673     9792898 :         if (!isTable(c->t))
     674             :                 return 0;
     675     9792924 :         d = tab_timestamp_storage(tr, c->t);
     676     9790552 :         ds = col_timestamp_delta(tr, c);
     677     9789663 :         if (!d ||!ds)
     678             :                 return 0;
     679     9789663 :         if (access == 2)
     680      485853 :                 return ds?ds->cs.ucnt:0;
     681     9303810 :         if (access == 1)
     682           0 :                 return count_inserts(d->segs->h, tr);
     683     9303810 :         if (access == QUICK || isTempTable(c->t))
     684      638052 :                 return d->segs->t?d->segs->t->end:0;
     685     8665758 :         if (access == CNT_ACTIVE) {
     686         673 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     687         673 :                 lock_table(tr->store, c->t->base.id);
     688         673 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     689         673 :                 unlock_table(tr->store, c->t->base.id);
     690         673 :                 return cnt;
     691             :         }
     692     8665085 :         return segs_end(d->segs, tr, c->t);
     693             : }
     694             : 
     695             : static size_t
     696       27961 : count_idx(sql_trans *tr, sql_idx *i, int access)
     697             : {
     698             :         storage *d;
     699             :         sql_delta *ds;
     700             : 
     701       27961 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     702        6649 :                 return 0;
     703       21311 :         d = tab_timestamp_storage(tr, i->t);
     704       21303 :         ds = idx_timestamp_delta(tr, i);
     705       21298 :         if (!d || !ds)
     706             :                 return 0;
     707       21298 :         if (access == 2)
     708        2980 :                 return ds?ds->cs.ucnt:0;
     709       18318 :         if (access == 1)
     710           0 :                 return count_inserts(d->segs->h, tr);
     711       18318 :         if (access == QUICK || isTempTable(i->t))
     712        3727 :                 return d->segs->t?d->segs->t->end:0;
     713       14591 :         return segs_end(d->segs, tr, i->t);
     714             : }
     715             : 
     716             : static BAT *
     717     3306189 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     718             : {
     719             :         BAT *b;
     720             : 
     721     3306189 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     722             :         /* returns the updates for cs */
     723     3306189 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     724        5136 :                 if (access == RD_UPD_ID) {
     725        3428 :                         if (!(b = temp_descriptor(cs->uibid)))
     726             :                                 return NULL;
     727        3428 :                         if (!b->tsorted || ((BATtdense(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     728         118 :                            (!BATtdense(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     729        3310 :                                         oid nil = oid_nil;
     730             :                                         /* less then cnt */
     731        3310 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
     732        3310 :                                         if (!s) {
     733           0 :                                                 bat_destroy(b);
     734           0 :                                                 return NULL;
     735             :                                         }
     736             : 
     737        3310 :                                         BAT *nb = BATproject(s, b);
     738        3310 :                                         bat_destroy(s);
     739        3310 :                                         bat_destroy(b);
     740             :                                         b = nb;
     741             :                         }
     742             :                 } else {
     743        1708 :                         b = temp_descriptor(cs->uvbid);
     744             :                 }
     745             :         } else {
     746     6436593 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     747             :         }
     748             :         return b;
     749             : }
     750             : 
     751             : static BAT *
     752           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     753             : {
     754             :         int err = 0;
     755           0 :         BAT *uv = *UV;
     756           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     757           0 :         BAT *ni = bat_new(TYPE_oid, cnt, TRANSIENT);
     758           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, TRANSIENT):NULL;
     759             : 
     760           0 :         if (!ni || (uv && !nv)) {
     761           0 :                 bat_destroy(ni);
     762           0 :                 bat_destroy(nv);
     763           0 :                 bat_destroy(ui);
     764           0 :                 bat_destroy(uv);
     765           0 :                 bat_destroy(oi);
     766           0 :                 bat_destroy(ov);
     767           0 :                 return NULL;
     768             :         }
     769             :         BATiter uvi;
     770             :         BATiter ovi;
     771             : 
     772           0 :         if (uv) {
     773           0 :                 uvi = bat_iterator(uv);
     774           0 :                 ovi = bat_iterator(ov);
     775             :         }
     776             : 
     777             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     778           0 :         BUN uip = 0, uie = BATcount(ui);
     779           0 :         BUN oip = 0, oie = BATcount(oi);
     780             : 
     781           0 :         oid uiseqb = ui->tseqbase;
     782           0 :         oid oiseqb = oi->tseqbase;
     783             :         oid *uipt = NULL, *oipt = NULL;
     784           0 :         BATiter uii = bat_iterator(ui);
     785           0 :         BATiter oii = bat_iterator(oi);
     786           0 :         if (!BATtdense(ui))
     787           0 :                 uipt = uii.base;
     788           0 :         if (!BATtdense(oi))
     789           0 :                 oipt = oii.base;
     790           0 :         while (uip < uie && oip < oie && !err) {
     791           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     792           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     793             : 
     794           0 :                 if (uiid <= oiid) {
     795           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     796           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     797             :                                 err = 1;
     798           0 :                         uip++;
     799           0 :                         if (uiid == oiid)
     800           0 :                                 oip++;
     801             :                 } else { /* uiid > oiid */
     802           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     803           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     804             :                                 err = 1;
     805           0 :                         oip++;
     806             :                 }
     807             :         }
     808           0 :         while (uip < uie && !err) {
     809           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     810           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     811           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     812             :                         err = 1;
     813           0 :                 uip++;
     814             :         }
     815           0 :         while (oip < oie && !err) {
     816           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     817           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     818           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     819             :                         err = 1;
     820           0 :                 oip++;
     821             :         }
     822           0 :         if (uv) {
     823           0 :                 bat_iterator_end(&uvi);
     824           0 :                 bat_iterator_end(&ovi);
     825             :         }
     826           0 :         bat_iterator_end(&uii);
     827           0 :         bat_iterator_end(&oii);
     828           0 :         bat_destroy(ui);
     829           0 :         bat_destroy(uv);
     830           0 :         bat_destroy(oi);
     831           0 :         bat_destroy(ov);
     832           0 :         if (!err) {
     833           0 :                 if (nv)
     834           0 :                         *UV = nv;
     835           0 :                 return ni;
     836             :         }
     837           0 :         *UV = NULL;
     838           0 :         bat_destroy(ni);
     839           0 :         bat_destroy(nv);
     840           0 :         return NULL;
     841             : }
     842             : 
     843             : static sql_delta *
     844     3138878 : older_delta( sql_delta *d, sql_trans *tr)
     845             : {
     846     3138878 :         sql_delta *o = d->next;
     847             : 
     848     3142670 :         while (o && !o->cs.merged) {
     849        3796 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     850             :                         break;
     851             :                 else
     852        3792 :                         o = o->next;
     853             :         }
     854     3138874 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     855           0 :                 return o;
     856             :         return NULL;
     857             : }
     858             : 
     859             : static BAT *
     860     3138955 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     861             : {
     862     3138955 :         assert(tr->active);
     863             :         sql_delta *o = NULL;
     864     3138955 :         BAT *ui = NULL, *uv = NULL;
     865             : 
     866     3138955 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     867             :                 return NULL;
     868     3138884 :         if (access == RD_UPD_VAL) {
     869      167247 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     870           0 :                         bat_destroy(ui);
     871           0 :                         return NULL;
     872             :                 }
     873             :         }
     874     3138886 :         while ((o = older_delta(d, tr)) != NULL) {
     875             :                 BAT *oui = NULL, *ouv = NULL;
     876             :                 if (!oui)
     877           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     878           0 :                 if (access == RD_UPD_VAL)
     879           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     880           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     881           0 :                         bat_destroy(ui);
     882           0 :                         bat_destroy(uv);
     883           0 :                         bat_destroy(oui);
     884           0 :                         bat_destroy(ouv);
     885           0 :                         return NULL;
     886             :                 }
     887           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     888             :                         return NULL;
     889             :                 d = o;
     890             :         }
     891     3138871 :         if (uv) {
     892      167250 :                 bat_destroy(ui);
     893      167252 :                 return uv;
     894             :         }
     895             :         return ui;
     896             : }
     897             : 
     898             : static BAT *
     899     3138868 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     900             : {
     901     3138868 :         sql_delta *d = col_timestamp_delta(tr, c);
     902             : 
     903     3138861 :         if (!d)
     904             :                 return NULL;
     905     3138861 :         return bind_ubat(tr, d, access, c->type.type->localtype, cnt);
     906             : }
     907             : 
     908             : static BAT *
     909         112 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     910             : {
     911         112 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     912         112 :         sql_delta *d = idx_timestamp_delta(tr, i);
     913             : 
     914         112 :         if (!d)
     915             :                 return NULL;
     916         112 :         return bind_ubat(tr, d, access, type, cnt);
     917             : }
     918             : 
     919             : static BAT *
     920     3547855 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     921             : {
     922             :         BAT *b;
     923             : 
     924     3547855 :         assert(access == RDONLY || access == QUICK);
     925     3547855 :         assert(cs != NULL);
     926     3547855 :         if (access == QUICK)
     927      252897 :                 return quick_descriptor(cs->bid);
     928     3294958 :         assert(cs->bid);
     929     3294958 :         b = temp_descriptor(cs->bid);
     930     3294891 :         if (b == NULL)
     931             :                 return NULL;
     932     3294891 :         bat_set_access(b, BAT_READ);
     933             :         /* return slice */
     934     3294891 :         BAT *s = BATslice(b, 0, cnt);
     935     3294776 :         bat_destroy(b);
     936     3294839 :         return s;
     937             : }
     938             : 
     939             : static void *                                   /* BAT * */
     940     6673891 : bind_col(sql_trans *tr, sql_column *c, int access)
     941             : {
     942     6673891 :         assert(access == QUICK || tr->active);
     943     6673891 :         if (!isTable(c->t))
     944             :                 return NULL;
     945     6673891 :         sql_delta *d = col_timestamp_delta(tr, c);
     946     6673816 :         if (!d)
     947             :                 return NULL;
     948     6673816 :         size_t cnt = count_col(tr, c, 0);
     949     6674128 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
     950     3138869 :                 return bind_ucol(tr, c, access, cnt);
     951     3535259 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
     952     3535043 :         assert(!b || b->ttype == c->type.type->localtype || (access == QUICK && b->ttype < 0));
     953             :         return b;
     954             : }
     955             : 
     956             : static void *                                   /* BAT * */
     957       12693 : bind_idx(sql_trans *tr, sql_idx * i, int access)
     958             : {
     959       12693 :         assert(access == QUICK || tr->active);
     960       12693 :         if (!isTable(i->t))
     961             :                 return NULL;
     962       12693 :         sql_delta *d = idx_timestamp_delta(tr, i);
     963       12693 :         if (!d)
     964             :                 return NULL;
     965       12693 :         size_t cnt = count_idx(tr, i, 0);
     966       12695 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
     967         112 :                 return bind_uidx(tr, i, access, cnt);
     968       12583 :         return cs_bind_bat( &d->cs, access, cnt);
     969             : }
     970             : 
     971             : static int
     972        1436 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
     973             : {
     974        1436 :         if (!cs->uibid) {
     975           0 :                 cs->uibid = e_bat(TYPE_oid);
     976           0 :                 if (cs->uibid == BID_NIL)
     977             :                         return LOG_ERR;
     978             :         }
     979        1436 :         if (!cs->uvbid) {
     980           0 :                 BAT *cur = quick_descriptor(cs->bid);
     981           0 :                 if (!cur)
     982             :                         return LOG_ERR;
     983           0 :                 int type = cur->ttype;
     984           0 :                 cs->uvbid = e_bat(type);
     985           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
     986             :                         return LOG_ERR;
     987             :         }
     988        1436 :         BAT *ui = temp_descriptor(cs->uibid);
     989        1436 :         BAT *uv = temp_descriptor(cs->uvbid);
     990             : 
     991        1436 :         if (ui == NULL || uv == NULL) {
     992           0 :                 bat_destroy(ui);
     993           0 :                 bat_destroy(uv);
     994           0 :                 return LOG_ERR;
     995             :         }
     996             :         assert(ui && uv);
     997        1436 :         if (isEbat(ui)){
     998         318 :                 temp_destroy(cs->uibid);
     999         318 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1000         318 :                 bat_destroy(ui);
    1001         636 :                 if (cs->uibid == BID_NIL ||
    1002         318 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1003           0 :                         bat_destroy(uv);
    1004           0 :                         return LOG_ERR;
    1005             :                 }
    1006             :         }
    1007        1436 :         if (isEbat(uv)){
    1008         318 :                 temp_destroy(cs->uvbid);
    1009         318 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1010         318 :                 bat_destroy(uv);
    1011         636 :                 if (cs->uvbid == BID_NIL ||
    1012         318 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1013           0 :                         bat_destroy(ui);
    1014           0 :                         return LOG_ERR;
    1015             :                 }
    1016             :         }
    1017        1436 :         *Ui = ui;
    1018        1436 :         *Uv = uv;
    1019        1436 :         return LOG_OK;
    1020             : }
    1021             : 
    1022             : static int
    1023        3397 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1024             : {
    1025       37010 :         for(; s; s=s->next) {
    1026       37010 :                 if (s->start <= rid && s->end > rid) {
    1027        3397 :                         if (s->ts == tr->tid && !s->deleted) {
    1028        1962 :                                 return 1;
    1029             :                         }
    1030             :                         break;
    1031             :                 }
    1032             :         }
    1033             :         return 0;
    1034             : }
    1035             : 
    1036             : static int
    1037        1435 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1038             : {
    1039       33033 :         for(; s; s=s->next) {
    1040       33033 :                 if (s->start <= rid && s->end > rid) {
    1041        1435 :                         if (s->ts >= tr->ts && s->deleted) {
    1042           0 :                                 return 1;
    1043             :                         }
    1044             :                         break;
    1045             :                 }
    1046             :         }
    1047             :         return 0;
    1048             : }
    1049             : 
    1050             : /*
    1051             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1052             :  */
    1053             : static int
    1054        2859 : cs_update_bat( sql_trans *tr, column_storage *cs, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1055             : {
    1056             :         int res = LOG_OK;
    1057             :         BAT *otids = tids, *oupdates = updates;
    1058             : 
    1059        2859 :         if (!BATcount(tids))
    1060             :                 return LOG_OK;
    1061             : 
    1062        2859 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1063           3 :                 otids = BATunmask(tids);
    1064           3 :                 if (!otids)
    1065             :                         return LOG_ERR;
    1066             :         }
    1067        2859 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1068           0 :                 oupdates = BATunmask(updates);
    1069           0 :                 if (!oupdates) {
    1070           0 :                         if (otids != tids)
    1071           0 :                                 bat_destroy(otids);
    1072           0 :                         return LOG_ERR;
    1073             :                 }
    1074             :         }
    1075        2859 :         if (updates && updates->ttype == TYPE_void) { /* dense later use optimized log structure */
    1076          44 :                 oupdates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, TRANSIENT);
    1077          44 :                 if (!oupdates) {
    1078           0 :                         if (otids != tids)
    1079           0 :                                 bat_destroy(otids);
    1080           0 :                         return LOG_ERR;
    1081             :                 }
    1082             :         }
    1083             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1084             :         /* currently only one update delta is possible */
    1085        2859 :         lock_table(tr->store, t->base.id);
    1086        2859 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1087        2859 :         if (!is_new && !cs->cleared) {
    1088        2537 :                 if (!otids->tsorted || complex_cand(otids) /* make sure we have simple dense or oids */) {
    1089             :                         BAT *sorted, *order;
    1090           6 :                         if (BATsort(&sorted, &order, NULL, otids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1091           0 :                                 if (otids != tids)
    1092           0 :                                         bat_destroy(otids);
    1093           0 :                                 if (oupdates != updates)
    1094           0 :                                         bat_destroy(oupdates);
    1095           0 :                                 unlock_table(tr->store, t->base.id);
    1096           0 :                                 return LOG_ERR;
    1097             :                         }
    1098           6 :                         if (otids != tids)
    1099           0 :                                 bat_destroy(otids);
    1100           6 :                         otids = sorted;
    1101           6 :                         BAT *noupdates = BATproject(order, oupdates);
    1102           6 :                         bat_destroy(order);
    1103           6 :                         if (oupdates != updates)
    1104           0 :                                 bat_destroy(oupdates);
    1105             :                         oupdates = noupdates;
    1106           6 :                         if (!oupdates) {
    1107           0 :                                 bat_destroy(otids);
    1108           0 :                                 unlock_table(tr->store, t->base.id);
    1109           0 :                                 return LOG_ERR;
    1110             :                         }
    1111             :                 }
    1112        2537 :                 assert(otids->tsorted);
    1113        2537 :                 BAT *ui = NULL, *uv = NULL;
    1114             : 
    1115             :                 /* handle updates on just inserted bits */
    1116             :                 /* handle updates on updates (within one transaction) */
    1117        2537 :                 BATiter upi = bat_iterator(oupdates);
    1118        2537 :                 BUN cnt = 0, ucnt = BATcount(otids);
    1119             :                 BAT *b, *ins = NULL;
    1120             :                 int *msk = NULL;
    1121             : 
    1122        2537 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1123             :                         res = LOG_ERR;
    1124             : 
    1125        2537 :                 if (res == LOG_OK && BATtdense(otids)) {
    1126             :                         oid start = otids->tseqbase, offset = start;
    1127        2434 :                         oid end = start + ucnt;
    1128             : 
    1129       10750 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=seg->next) {
    1130        8897 :                                 if (seg->start <= start && seg->end > start) {
    1131             :                                         /* check for delete conflicts */
    1132        2434 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1133             :                                                 res = LOG_CONFLICT;
    1134           0 :                                                 continue;
    1135             :                                         }
    1136             : 
    1137             :                                         /* check for inplace updates */
    1138        2434 :                                         BUN lend = end < seg->end?end:seg->end;
    1139        2434 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1140         265 :                                                 if (!ins) {
    1141         265 :                                                         ins = COLnew(0, TYPE_msk, ucnt, TRANSIENT);
    1142         265 :                                                         if (!ins)
    1143             :                                                                 res = LOG_ERR;
    1144             :                                                         else {
    1145         265 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1146         265 :                                                                 msk = (int*)Tloc(ins, 0);
    1147         265 :                                                                 BUN end = (ucnt+31)/32;
    1148         265 :                                                                 memset(msk, 0, end * sizeof(int));
    1149             :                                                         }
    1150             :                                                 }
    1151        1060 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1152         795 :                                                         ptr upd = BUNtail(upi, rid-offset);
    1153         795 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1154             :                                                                 res = LOG_ERR;
    1155             : 
    1156         795 :                                                         oid word = i/32;
    1157         795 :                                                         int pos = i%32;
    1158         795 :                                                         msk[word] |= 1U<<pos;
    1159         795 :                                                         cnt++;
    1160             :                                                 }
    1161             :                                         }
    1162             :                                 }
    1163        8897 :                                 if (end < seg->end)
    1164             :                                         break;
    1165             :                         }
    1166         103 :                 } else if (res == LOG_OK) {
    1167             :                         BUN i = 0;
    1168         103 :                         oid *rid = Tloc(otids,0);
    1169         103 :                         segment *seg = s->segs->h;
    1170        2503 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1171        2400 :                                 if (seg->end <= rid[i])
    1172        1666 :                                         seg = seg->next;
    1173         734 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1174             :                                         /* check for delete conflicts */
    1175         734 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1176             :                                                 res = LOG_CONFLICT;
    1177           0 :                                                 continue;
    1178             :                                         }
    1179             : 
    1180             :                                         /* check for inplace updates */
    1181         734 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1182         591 :                                                 if (!ins) {
    1183          75 :                                                         ins = COLnew(0, TYPE_msk, ucnt, TRANSIENT);
    1184          75 :                                                         if (!ins) {
    1185             :                                                                 res = LOG_ERR;
    1186             :                                                                 break;
    1187             :                                                         } else {
    1188          75 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1189          75 :                                                                 msk = (int*)Tloc(ins, 0);
    1190          75 :                                                                 BUN end = (ucnt+31)/32;
    1191          75 :                                                                 memset(msk, 0, end * sizeof(int));
    1192             :                                                         }
    1193             :                                                 }
    1194         591 :                                                 ptr upd = BUNtail(upi, i);
    1195         591 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1196             :                                                         res = LOG_ERR;
    1197             : 
    1198         591 :                                                 oid word = i/32;
    1199         591 :                                                 int pos = i%32;
    1200         591 :                                                 msk[word] |= 1U<<pos;
    1201         591 :                                                 cnt++;
    1202             :                                         }
    1203         734 :                                         i++;
    1204             :                                 }
    1205             :                         }
    1206             :                 }
    1207             : 
    1208        2537 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1209        2205 :                         if (cs->ucnt == 0) {
    1210        2204 :                                 if (cnt) {
    1211           8 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1212           8 :                                         if (nins) {
    1213           8 :                                                 ui = BATproject(nins, otids);
    1214           8 :                                                 uv = BATproject(nins, oupdates);
    1215           8 :                                                 bat_destroy(nins);
    1216             :                                         }
    1217             :                                 } else {
    1218        2196 :                                         ui = temp_descriptor(otids->batCacheid);
    1219        2196 :                                         uv = temp_descriptor(oupdates->batCacheid);
    1220             :                                 }
    1221        2204 :                                 if (!ui || !uv) {
    1222             :                                         res = LOG_ERR;
    1223             :                                 } else {
    1224        2204 :                                         temp_destroy(cs->uibid);
    1225        2204 :                                         temp_destroy(cs->uvbid);
    1226        2204 :                                         cs->uibid = temp_create(ui);
    1227        2204 :                                         cs->uvbid = temp_create(uv);
    1228        2204 :                                         cs->ucnt = BATcount(ui);
    1229             :                                 }
    1230             :                         } else {
    1231             :                                 BAT *nui = NULL, *nuv = NULL;
    1232             : 
    1233             :                                 /* merge taking msk of inserted into account */
    1234           1 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1235             :                                         res = LOG_ERR;
    1236             : 
    1237             :                                 if (res == LOG_OK) {
    1238             :                                         ptr upd = NULL;
    1239           1 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, TRANSIENT);
    1240           1 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, TRANSIENT);
    1241             : 
    1242           1 :                                         if (!nui || !nuv) {
    1243             :                                                 res = LOG_ERR;
    1244             :                                         } else {
    1245           1 :                                                 BATiter ovi = bat_iterator(uv);
    1246             : 
    1247             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1248           1 :                                                 BUN uip = 0, uie = BATcount(ui);
    1249           1 :                                                 BUN nip = 0, nie = BATcount(otids);
    1250           1 :                                                 oid uiseqb = ui->tseqbase;
    1251           1 :                                                 oid niseqb = otids->tseqbase;
    1252             :                                                 oid *uipt = NULL, *nipt = NULL;
    1253           1 :                                                 BATiter uii = bat_iterator(ui);
    1254           1 :                                                 BATiter otidsi = bat_iterator(otids);
    1255           1 :                                                 if (!BATtdense(ui))
    1256           0 :                                                         uipt = uii.base;
    1257           1 :                                                 if (!BATtdense(otids))
    1258           0 :                                                         nipt = otidsi.base;
    1259           2 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1260           1 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1261           1 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1262             : 
    1263           1 :                                                         if (uiv < niv) {
    1264           0 :                                                                 upd = BUNtail(ovi, uip);
    1265           0 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1266           0 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1267             :                                                                         res = LOG_ERR;
    1268           0 :                                                                 uip++;
    1269           1 :                                                         } else if (uiv == niv) {
    1270             :                                                                 /* handle == */
    1271           1 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1272           1 :                                                                         upd = BUNtail(upi, nip);
    1273           2 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1274           1 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1275             :                                                                                 res = LOG_ERR;
    1276             :                                                                 } else {
    1277           0 :                                                                         upd = BUNtail(ovi, uip);
    1278           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1279           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1280             :                                                                                 res = LOG_ERR;
    1281             :                                                                 }
    1282           1 :                                                                 uip++;
    1283           1 :                                                                 nip++;
    1284             :                                                         } else { /* uiv > niv */
    1285           0 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1286           0 :                                                                         upd = BUNtail(upi, nip);
    1287           0 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1288           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1289             :                                                                                 res = LOG_ERR;
    1290             :                                                                 }
    1291           0 :                                                                 nip++;
    1292             :                                                         }
    1293             :                                                 }
    1294           1 :                                                 while (uip < uie && res == LOG_OK) {
    1295           0 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1296           0 :                                                         upd = BUNtail(ovi, uip);
    1297           0 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1298           0 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1299             :                                                                 res = LOG_ERR;
    1300           0 :                                                         uip++;
    1301             :                                                 }
    1302           1 :                                                 while (nip < nie && res == LOG_OK) {
    1303           0 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1304           0 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1305           0 :                                                                 upd = BUNtail(upi, nip);
    1306           0 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1307           0 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1308             :                                                                         res = LOG_ERR;
    1309             :                                                         }
    1310           0 :                                                         nip++;
    1311             :                                                 }
    1312           1 :                                                 bat_iterator_end(&uii);
    1313           1 :                                                 bat_iterator_end(&otidsi);
    1314           1 :                                                 bat_iterator_end(&ovi);
    1315           1 :                                                 if (res == LOG_OK) {
    1316           1 :                                                         temp_destroy(cs->uibid);
    1317           1 :                                                         temp_destroy(cs->uvbid);
    1318           1 :                                                         cs->uibid = temp_create(nui);
    1319           1 :                                                         cs->uvbid = temp_create(nuv);
    1320           1 :                                                         cs->ucnt = BATcount(nui);
    1321             :                                                 }
    1322             :                                         }
    1323           1 :                                         bat_destroy(nui);
    1324           1 :                                         bat_destroy(nuv);
    1325             :                                 }
    1326             :                         }
    1327             :                 }
    1328        2537 :                 bat_iterator_end(&upi);
    1329        2537 :                 bat_destroy(b);
    1330        2537 :                 unlock_table(tr->store, t->base.id);
    1331        2537 :                 bat_destroy(ins);
    1332        2537 :                 bat_destroy(ui);
    1333        2537 :                 bat_destroy(uv);
    1334        2537 :                 if (otids != tids)
    1335           7 :                         bat_destroy(otids);
    1336        2537 :                 if (oupdates != updates)
    1337          10 :                         bat_destroy(oupdates);
    1338        2537 :                 return res;
    1339         322 :         } else if (is_new || cs->cleared) {
    1340         322 :                 BAT *b = temp_descriptor(cs->bid);
    1341             : 
    1342         322 :                 if (b == NULL) {
    1343             :                         res = LOG_ERR;
    1344         322 :                 } else if (BATcount(b)==0) {
    1345           0 :                         if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1346             :                                 res = LOG_ERR;
    1347         322 :                 } else if (BATreplace(b, otids, updates, true) != GDK_SUCCEED)
    1348             :                         res = LOG_ERR;
    1349         322 :                 BBPcold(b->batCacheid);
    1350         322 :                 bat_destroy(b);
    1351             :         }
    1352         322 :         unlock_table(tr->store, t->base.id);
    1353         322 :         if (otids != tids)
    1354           2 :                 bat_destroy(otids);
    1355         322 :         if (oupdates != updates)
    1356          40 :                 bat_destroy(oupdates);
    1357             :         return res;
    1358             : }
    1359             : 
    1360             : static int
    1361             : delta_update_bat( sql_trans *tr, sql_delta *bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1362             : {
    1363        2859 :         return cs_update_bat(tr, &bat->cs, t, tids, updates, is_new);
    1364             : }
    1365             : 
    1366             : static int
    1367        3397 : cs_update_val( sql_trans *tr, column_storage *cs, sql_table *t, oid rid, void *upd, int is_new)
    1368             : {
    1369        3397 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1370        3397 :         assert(!is_oid_nil(rid));
    1371        3397 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1372             : 
    1373             :         /* check if rid is insert ? */
    1374             :         if (!inplace) {
    1375             :                 /* check conflict */
    1376        1435 :                 if (segments_is_deleted(s->segs->h, tr, rid))
    1377           0 :                         return LOG_CONFLICT;
    1378             :                 BAT *ui, *uv;
    1379             : 
    1380             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1381             :                 /* currently only one update delta is possible */
    1382        1435 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1383             :                         return LOG_ERR;
    1384             : 
    1385        1435 :                 assert(uv->ttype);
    1386        1435 :                 assert(BATcount(ui) == BATcount(uv));
    1387        2870 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1388        1435 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1389           0 :                         bat_destroy(ui);
    1390           0 :                         bat_destroy(uv);
    1391           0 :                         return LOG_ERR;
    1392             :                 }
    1393        1435 :                 assert(BATcount(ui) == BATcount(uv));
    1394        1435 :                 bat_destroy(ui);
    1395        1435 :                 bat_destroy(uv);
    1396        1435 :                 cs->ucnt++;
    1397             :         } else {
    1398             :                 BAT *b = NULL;
    1399             : 
    1400        1962 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1401             :                         return LOG_ERR;
    1402        1962 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1403           0 :                         bat_destroy(b);
    1404           0 :                         return LOG_ERR;
    1405             :                 }
    1406        1962 :                 bat_destroy(b);
    1407             :         }
    1408             :         return LOG_OK;
    1409             : }
    1410             : 
    1411             : static int
    1412        3397 : delta_update_val( sql_trans *tr, sql_delta *bat, sql_table *t, oid rid, void *upd, int is_new)
    1413             : {
    1414             :         int res = LOG_OK;
    1415        3397 :         lock_table(tr->store, t->base.id);
    1416        3397 :         res = cs_update_val(tr, &bat->cs, t, rid, upd, is_new);
    1417        3397 :         unlock_table(tr->store, t->base.id);
    1418        3397 :         return res;
    1419             : }
    1420             : 
    1421             : static int
    1422        3581 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1423             : {
    1424             :         (void)tr;
    1425        3581 :         if (!ocs)
    1426             :                 return LOG_OK;
    1427             :         (void)type;
    1428        3581 :         cs->bid = ocs->bid;
    1429        3581 :         cs->uibid = ocs->uibid;
    1430        3581 :         cs->uvbid = ocs->uvbid;
    1431        3581 :         cs->ucnt = ocs->ucnt;
    1432             : 
    1433        3581 :         if (temp) {
    1434         143 :                 cs->bid = temp_copy(cs->bid, true, true);
    1435         143 :                 if (cs->bid == BID_NIL)
    1436             :                         return LOG_ERR;
    1437             :         } else {
    1438        3438 :                 temp_dup(cs->bid);
    1439             :         }
    1440        3581 :         cs->ucnt = 0;
    1441        3581 :         cs->uibid = e_bat(TYPE_oid);
    1442        3581 :         cs->uvbid = e_bat(type);
    1443        3581 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1444           0 :                 return LOG_ERR;
    1445             :         return LOG_OK;
    1446             : }
    1447             : 
    1448             : static int
    1449             : dup_bat(sql_trans *tr, sql_table *t, sql_delta *obat, sql_delta *bat, int type)
    1450             : {
    1451        3438 :         return dup_cs(tr, &obat->cs, &bat->cs, type, isTempTable(t));
    1452             : }
    1453             : 
    1454             : static int
    1455      144993 : destroy_delta(sql_delta *b, bool recursive)
    1456             : {
    1457             :         int ok = LOG_OK;
    1458             : 
    1459      144993 :         if (--b->cs.refcnt > 0)
    1460             :                 return LOG_OK;
    1461      124819 :         if (recursive && b->next)
    1462           2 :                 ok = destroy_delta(b->next, true);
    1463      124819 :         if (b->cs.uibid)
    1464       95322 :                 temp_destroy(b->cs.uibid);
    1465      124819 :         if (b->cs.uvbid)
    1466       95322 :                 temp_destroy(b->cs.uvbid);
    1467      124819 :         if (b->cs.bid)
    1468      124819 :                 temp_destroy(b->cs.bid);
    1469      124819 :         b->cs.bid = b->cs.uibid = b->cs.uvbid = 0;
    1470      124819 :         _DELETE(b);
    1471      124819 :         return ok;
    1472             : }
    1473             : 
    1474             : static sql_delta *
    1475    14055977 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1476             : {
    1477    14055977 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1478             : 
    1479    14055977 :         if (isTempTable(c->t) && !(obat = temp_col_timestamp_delta(tr, c)))
    1480             :                 return NULL;
    1481             : 
    1482    14055977 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1483    14052556 :                 return obat;
    1484        3421 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(c->t)) {
    1485             :                 /* abort */
    1486          12 :                 if (update_conflict)
    1487           4 :                         *update_conflict = true;
    1488             :                 else
    1489           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1490           4 :                 return NULL;
    1491             :         }
    1492        3409 :         assert(!isTempTable(c->t));
    1493        3409 :         obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1494        3409 :         sql_delta* bat = ZNEW(sql_delta);
    1495        3409 :         if(!bat)
    1496             :                 return NULL;
    1497        3409 :         bat->cs.refcnt = 1;
    1498        3409 :         if(dup_bat(tr, c->t, obat, bat, c->type.type->localtype) != LOG_OK)
    1499             :                 return NULL;
    1500        3409 :         bat->cs.ts = tr->tid;
    1501             :         /* only one writer else abort */
    1502        3409 :         bat->next = obat;
    1503        3409 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    1504           0 :                 bat->next = NULL;
    1505           0 :                 destroy_delta(bat, false);
    1506           0 :                 return NULL;
    1507             :         }
    1508             :         return bat;
    1509             : }
    1510             : 
    1511             : static int
    1512        6256 : update_col_execute(sql_trans *tr, sql_delta *delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    1513             : {
    1514             :         int ok = LOG_OK;
    1515             : 
    1516        6256 :         if (is_bat) {
    1517             :                 BAT *tids = incoming_tids;
    1518             :                 BAT *values = incoming_values;
    1519        2859 :                 if (BATcount(tids) == 0)
    1520             :                         return LOG_OK;
    1521        2859 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    1522             :         } else {
    1523        3397 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    1524             :         }
    1525             :         return ok;
    1526             : }
    1527             : 
    1528             : static int
    1529        6293 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, int tpe)
    1530             : {
    1531        6293 :         bool update_conflict = false;
    1532        6293 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    1533             : 
    1534        6293 :         if (tpe == TYPE_bat) {
    1535             :                 BAT *t = tids;
    1536        2893 :                 if (!BATcount(t))
    1537             :                         return LOG_OK;
    1538             :         }
    1539             : 
    1540        5987 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    1541           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    1542             : 
    1543        5983 :         assert(delta && delta->cs.ts == tr->tid);
    1544        5983 :         if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t)) && isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
    1545        2975 :                 trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, isTempTable(c->t)?NULL:&log_update_col);
    1546             : 
    1547        5983 :         return update_col_execute(tr, delta, c->t, isNew(c), tids, upd, tpe == TYPE_bat);
    1548             : }
    1549             : 
    1550             : static sql_delta *
    1551        2439 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    1552             : {
    1553        2439 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    1554             : 
    1555        2439 :         if (isTempTable(i->t) && !(obat = temp_idx_timestamp_delta(tr, i)))
    1556             :                 return NULL;
    1557             : 
    1558        2439 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1559        2410 :                 return obat;
    1560          29 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(i->t)) {
    1561             :                 /* abort */
    1562           0 :                 if (update_conflict)
    1563           0 :                         *update_conflict = true;
    1564           0 :                 return NULL;
    1565             :         }
    1566          29 :         assert(!isTempTable(i->t));
    1567          29 :         obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data));
    1568          29 :         sql_delta* bat = ZNEW(sql_delta);
    1569          29 :         if(!bat)
    1570             :                 return NULL;
    1571          29 :         bat->cs.refcnt = 1;
    1572          34 :         if(dup_bat(tr, i->t, obat, bat, (oid_index(i->type))?TYPE_oid:TYPE_lng) != LOG_OK)
    1573             :                 return NULL;
    1574          29 :         bat->cs.ts = tr->tid;
    1575             :         /* only one writer else abort */
    1576          29 :         bat->next = obat;
    1577          29 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    1578           0 :                 bat->next = NULL;
    1579           0 :                 destroy_delta(bat, false);
    1580           0 :                 return NULL;
    1581             :         }
    1582             :         return bat;
    1583             : }
    1584             : 
    1585             : static int
    1586         741 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, int tpe)
    1587             : {
    1588         741 :         bool update_conflict = false;
    1589         741 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    1590             : 
    1591         741 :         if (tpe == TYPE_bat) {
    1592             :                 BAT *t = tids;
    1593         741 :                 if (!BATcount(t))
    1594             :                         return LOG_OK;
    1595             :         }
    1596             : 
    1597         273 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    1598           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    1599             : 
    1600         273 :         assert(delta && delta->cs.ts == tr->tid);
    1601         273 :         if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t)) && isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
    1602          23 :                 trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, isTempTable(i->t)?NULL:&log_update_idx);
    1603             : 
    1604         273 :         return update_col_execute(tr, delta, i->t, isNew(i), tids, upd, tpe == TYPE_bat);
    1605             : }
    1606             : 
    1607             : static int
    1608       18704 : delta_append_bat(sql_trans *tr, sql_delta *bat, sqlid id, BUN offset, BAT *offsets, BAT *i)
    1609             : {
    1610             :         BAT *b, *oi = i;
    1611             :         int err = 0;
    1612             : 
    1613       18704 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    1614       18704 :         if (!BATcount(i))
    1615             :                 return LOG_OK;
    1616       18704 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    1617             :                 return LOG_ERR;
    1618             : 
    1619       18704 :         lock_column(tr->store, id);
    1620       18687 :         b = temp_descriptor(bat->cs.bid);
    1621       18691 :         if (b == NULL) {
    1622           0 :                 unlock_column(tr->store, id);
    1623           0 :                 if (oi != i)
    1624           0 :                         bat_destroy(oi);
    1625           0 :                 return LOG_ERR;
    1626             :         }
    1627       18691 :         if (!offsets && offset == b->hseqbase+BATcount(b)) {
    1628       18488 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    1629             :                         err = 1;
    1630         203 :         } else if (!offsets) {
    1631         189 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    1632             :                         err = 1;
    1633          14 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    1634           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    1635             :                         err = 1;
    1636          14 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    1637             :                         err = 1;
    1638             :         }
    1639       18702 :         bat_destroy(b);
    1640       18703 :         unlock_column(tr->store, id);
    1641             : 
    1642       18706 :         if (oi != i)
    1643           0 :                 bat_destroy(oi);
    1644       18704 :         return (err)?LOG_ERR:LOG_OK;
    1645             : }
    1646             : 
    1647             : static int
    1648    14032083 : delta_append_val(sql_trans *tr, sql_delta *bat, sqlid id, BUN offset, void *i, BUN cnt)
    1649             : {
    1650    14032083 :         lock_column(tr->store, id);
    1651    14031905 :         BAT *b = temp_descriptor(bat->cs.bid);
    1652    14031853 :         if (b == NULL) {
    1653           0 :                 unlock_column(tr->store, id);
    1654           0 :                 return LOG_ERR;
    1655             :         }
    1656    14031853 :         BUN bcnt = BATcount(b);
    1657    14031853 :         if (bcnt > offset){
    1658      498304 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    1659      498304 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    1660           0 :                         bat_destroy(b);
    1661           0 :                         unlock_column(tr->store, id);
    1662           0 :                         return LOG_ERR;
    1663             :                 }
    1664      498075 :                 cnt -= ccnt;
    1665      498075 :                 offset += ccnt;
    1666             :         }
    1667    14031624 :         if (cnt) {
    1668    13533527 :                 if (BATcount(b) < offset) { /* add space */
    1669         126 :                         const void *tv = ATOMnilptr(b->ttype);
    1670         126 :                         lng i, d = offset - BATcount(b);
    1671         442 :                         for(i=0;i<d;i++) {
    1672         316 :                                 if (BUNappend(b, tv, true) != GDK_SUCCEED) {
    1673           0 :                                         bat_destroy(b);
    1674           0 :                                         unlock_column(tr->store, id);
    1675           0 :                                         return LOG_ERR;
    1676             :                                 }
    1677             :                         }
    1678             :                 }
    1679    13533527 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    1680           0 :                         bat_destroy(b);
    1681           0 :                         unlock_column(tr->store, id);
    1682           0 :                         return LOG_ERR;
    1683             :                 }
    1684             :         }
    1685    14031649 :         bat_destroy(b);
    1686    14031721 :         unlock_column(tr->store, id);
    1687    14031706 :         return LOG_OK;
    1688             : }
    1689             : 
    1690             : static int
    1691         143 : dup_storage( sql_trans *tr, storage *obat, storage *bat, int temp)
    1692             : {
    1693         143 :         if (temp) {
    1694         143 :                 if (!(bat->segs = new_segments(tr, 0)))
    1695             :                         return LOG_ERR;
    1696             :         } else {
    1697           0 :                 bat->segs = dup_segments(obat->segs);
    1698             :         }
    1699         143 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, temp);
    1700             : }
    1701             : 
    1702             : static int
    1703    14051614 : append_col_execute(sql_trans *tr, sql_delta *delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool is_bat)
    1704             : {
    1705             :         int ok = LOG_OK;
    1706             : 
    1707    14051614 :         delta->cs.merged = 0;
    1708    14051614 :         if (is_bat) {
    1709             :                 BAT *bat = incoming_data;
    1710             : 
    1711       19533 :                 if (BATcount(bat))
    1712       18703 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat);
    1713             :         } else {
    1714    14032081 :                 ok = delta_append_val(tr, delta, id, offset, incoming_data, cnt);
    1715             :         }
    1716    14051319 :         return ok;
    1717             : }
    1718             : 
    1719             : static int
    1720    14049412 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *i, BUN cnt, int tpe)
    1721             : {
    1722    14049412 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    1723             : 
    1724    14049412 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    1725             :                 return LOG_ERR;
    1726             : 
    1727    14049532 :         assert(delta && (!isTempTable(c->t) || delta->cs.ts == tr->tid));
    1728    14049532 :         if (isTempTable(c->t))
    1729        1258 :         if ((!inTransaction(tr, c->t) && (odelta != delta || !segments_in_transaction(tr, c->t) || isTempTable(c->t)) && isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
    1730        2450 :                 trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, isTempTable(c->t)?NULL:&log_update_col);
    1731             : 
    1732    14049532 :         return append_col_execute(tr, delta, c->base.id, offset, offsets, i, cnt, tpe == TYPE_bat);
    1733             : }
    1734             : 
    1735             : static int
    1736        2159 : append_idx(sql_trans *tr, sql_idx * i, BUN offset, BAT *offsets, void *data, BUN cnt, int tpe)
    1737             : {
    1738        2159 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    1739             : 
    1740        2159 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    1741             :                 return LOG_ERR;
    1742             : 
    1743        2159 :         assert(delta && (!isTempTable(i->t) || delta->cs.ts == tr->tid));
    1744        2159 :         if (isTempTable(i->t))
    1745           0 :         if ((!inTransaction(tr, i->t) && (odelta != delta || !segments_in_transaction(tr, i->t) || isTempTable(i->t)) && isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
    1746           0 :                 trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, isTempTable(i->t)?NULL:&log_update_idx);
    1747             : 
    1748        2159 :         return append_col_execute(tr, delta, i->base.id, offset, offsets, data, cnt, tpe == TYPE_bat);
    1749             : }
    1750             : 
    1751             : static int
    1752       74189 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    1753             : {
    1754             :         int err = 0;
    1755             : 
    1756             :         /* TODO check for conflicting updates */
    1757             :         (void)rid;
    1758             :         (void)cnt;
    1759      542278 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    1760      468089 :                 sql_column *c = n->data;
    1761      468089 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    1762             : 
    1763             :                 /* check for active updates */
    1764      468089 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    1765             :                         return 1;
    1766             :         }
    1767             :         return 0;
    1768             : }
    1769             : 
    1770             : static int
    1771       71577 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    1772             : {
    1773       71577 :         int in_transaction = segments_in_transaction(tr, t);
    1774             : 
    1775       71577 :         lock_table(tr->store, t->base.id);
    1776             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    1777       71577 :         segment *seg = s->segs->h, *p = NULL;
    1778    51955932 :         for (; seg; p = seg, seg = seg->next) {
    1779    51955932 :                 if (seg->start <= rid && seg->end > rid) {
    1780       71577 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    1781           4 :                                 unlock_table(tr->store, t->base.id);
    1782           4 :                                 return LOG_CONFLICT;
    1783             :                         }
    1784       71573 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    1785           0 :                                 unlock_table(tr->store, t->base.id);
    1786           0 :                                 return LOG_CONFLICT;
    1787             :                         }
    1788       71573 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true))
    1789             :                                 return LOG_ERR;
    1790             :                         break;
    1791             :                 }
    1792             :         }
    1793       71573 :         unlock_table(tr->store, t->base.id);
    1794       71573 :         if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || (!isNew(t) && isLocalTemp(t)))
    1795       10678 :                 trans_add(tr, &t->base, s, &tc_gc_del, &commit_update_del, isTempTable(t)?NULL:&log_update_del);
    1796             :         return LOG_OK;
    1797             : }
    1798             : 
    1799             : static int
    1800        2613 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    1801             : {
    1802        2613 :         segment *seg = *Seg, *p = NULL;
    1803       45998 :         for (; seg; p = seg, seg = seg->next) {
    1804       45996 :                 if (seg->start <= start && seg->end > start) {
    1805             :                         size_t lcnt = cnt;
    1806        2654 :                         if (start+lcnt > seg->end)
    1807          44 :                                 lcnt = seg->end-start;
    1808        2654 :                         if (SEG_IS_DELETED(seg, tr)) {
    1809          37 :                                 start += lcnt;
    1810          37 :                                 cnt -= lcnt;
    1811          37 :                                 continue;
    1812        2617 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    1813           1 :                                 return LOG_CONFLICT;
    1814        2616 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    1815             :                                 return LOG_CONFLICT;
    1816        2616 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    1817        2616 :                         if (!seg)
    1818             :                                 return LOG_ERR;
    1819        2616 :                         start += lcnt;
    1820        2616 :                         cnt -= lcnt;
    1821             :                 }
    1822       45958 :                 if (start+cnt <= seg->end)
    1823             :                         break;
    1824             :         }
    1825             :         return LOG_OK;
    1826             : }
    1827             : 
    1828             : static int
    1829        2445 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    1830             : {
    1831        2445 :         segment *seg = s->segs->h;
    1832        2445 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    1833             : }
    1834             : 
    1835             : static int
    1836         295 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    1837             : {
    1838         295 :         int in_transaction = segments_in_transaction(tr, t);
    1839             :         BAT *oi = i;    /* update ids */
    1840             :         int ok = LOG_OK;
    1841             : 
    1842         295 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    1843             :                 return LOG_ERR;
    1844         295 :         if (BATcount(i)) {
    1845         295 :                 if (BATtdense(i)) {
    1846             :                         size_t start = i->tseqbase;
    1847             :                         size_t cnt = BATcount(i);
    1848             : 
    1849         248 :                         lock_table(tr->store, t->base.id);
    1850         248 :                         ok = delete_range(tr, t, s, start, cnt);
    1851         248 :                         unlock_table(tr->store, t->base.id);
    1852          47 :                 } else if (complex_cand(i)) {
    1853             :                         struct canditer ci;
    1854             :                         oid f = 0, l = 0, cur = 0;
    1855             : 
    1856           0 :                         canditer_init(&ci, NULL, i);
    1857           0 :                         cur = f = canditer_next(&ci);
    1858             : 
    1859           0 :                         lock_table(tr->store, t->base.id);
    1860           0 :                         if (!is_oid_nil(f)) {
    1861           0 :                                 segment *seg = s->segs->h;
    1862           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    1863           0 :                                         if (cur+1 == l) {
    1864             :                                                 cur++;
    1865           0 :                                                 continue;
    1866             :                                         }
    1867           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    1868             :                                         f = cur = l;
    1869             :                                 }
    1870           0 :                                 if (ok == LOG_OK)
    1871           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    1872             :                         }
    1873           0 :                         unlock_table(tr->store, t->base.id);
    1874             :                 } else {
    1875          47 :                         if (!BATtordered(i)) {
    1876           0 :                                 assert(oi == i);
    1877           0 :                                 BAT *ni = NULL;
    1878           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    1879             :                                         ok = LOG_ERR;
    1880           0 :                                 if (ni)
    1881             :                                         i = ni;
    1882             :                         }
    1883          47 :                         assert(BATtordered(i));
    1884          47 :                         BUN icnt = BATcount(i);
    1885          47 :                         BATiter ii = bat_iterator(i);
    1886          47 :                         oid *o = ii.base, n = o[0]+1;
    1887             :                         size_t lcnt = 1;
    1888             : 
    1889          47 :                         lock_table(tr->store, t->base.id);
    1890          47 :                         segment *seg = s->segs->h;
    1891        4398 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    1892        4351 :                                 if (o[i] == n) {
    1893        4230 :                                         lcnt++;
    1894        4230 :                                         n++;
    1895             :                                 } else {
    1896         121 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    1897             :                                         lcnt = 0;
    1898             :                                 }
    1899        4351 :                                 if (!lcnt) {
    1900         121 :                                         n = o[i]+1;
    1901             :                                         lcnt = 1;
    1902             :                                 }
    1903             :                         }
    1904          47 :                         bat_iterator_end(&ii);
    1905          47 :                         if (lcnt && ok == LOG_OK)
    1906          47 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    1907          47 :                         unlock_table(tr->store, t->base.id);
    1908             :                 }
    1909             :         }
    1910         295 :         if (i != oi)
    1911           1 :                 bat_destroy(i);
    1912         295 :         if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || (!isNew(t) && isLocalTemp(t)))
    1913         256 :                 trans_add(tr, &t->base, s, &tc_gc_del, &commit_update_del, isTempTable(t)?NULL:&log_update_del);
    1914             :         return ok;
    1915             : }
    1916             : 
    1917             : static void
    1918       18926 : destroy_segments(segments *s)
    1919             : {
    1920       18926 :         if (!s || sql_ref_dec(&s->r) > 0)
    1921           0 :                 return;
    1922       18926 :         segment *seg = s->h;
    1923       45497 :         while(seg) {
    1924       26571 :                 segment *n = seg->next;
    1925       26571 :                 _DELETE(seg);
    1926             :                 seg = n;
    1927             :         }
    1928       18926 :         _DELETE(s);
    1929             : }
    1930             : 
    1931             : static int
    1932       20367 : destroy_storage(storage *bat)
    1933             : {
    1934             :         int ok = LOG_OK;
    1935             : 
    1936       20367 :         if (--bat->cs.refcnt > 0)
    1937             :                 return LOG_OK;
    1938       18761 :         if (bat->next)
    1939           1 :                 ok = destroy_storage(bat->next);
    1940       18761 :         destroy_segments(bat->segs);
    1941       18761 :         if (bat->cs.uibid)
    1942        5428 :                 temp_destroy(bat->cs.uibid);
    1943       18761 :         if (bat->cs.uvbid)
    1944        5428 :                 temp_destroy(bat->cs.uvbid);
    1945       18761 :         if (bat->cs.bid)
    1946       18761 :                 temp_destroy(bat->cs.bid);
    1947       18761 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    1948       18761 :         _DELETE(bat);
    1949       18761 :         return ok;
    1950             : }
    1951             : 
    1952             : static int
    1953       86617 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    1954             : {
    1955       86617 :         if (uncommitted) {
    1956      247392 :                 for (segment *s = segs->h; s; s = s->next)
    1957      165611 :                         if (!VALID_4_READ(s->ts,tr))
    1958             :                                 return 1;
    1959             :         } else {
    1960       11590 :                 for (segment *s = segs->h; s; s = s->next)
    1961        6761 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    1962             :                                 return 1;
    1963             :         }
    1964             : 
    1965             :         return 0;
    1966             : }
    1967             : 
    1968             : static storage *
    1969     1866503 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    1970             : {
    1971     1866503 :         storage *obat = ATOMIC_PTR_GET(&t->data);
    1972             : 
    1973     1866503 :         if (isTempTable(t) && !(obat = temp_tab_timestamp_storage(tr, t)))
    1974             :                 return NULL;
    1975             : 
    1976     1866503 :         if (obat->cs.ts == tr->tid)
    1977             :                 return obat;
    1978     1053027 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(t)) {
    1979             :                 /* abort */
    1980           0 :                 if (clear)
    1981           0 :                         *clear = true;
    1982           0 :                 return NULL;
    1983             :         }
    1984     1053027 :         if (!isTempTable(t) && !clear)
    1985             :                 return obat;
    1986         144 :         if (!isTempTable(t) && clear && segments_conflict(tr, obat->segs, 1)) {
    1987           1 :                 *clear = true;
    1988           1 :                 return NULL;
    1989             :         }
    1990             : 
    1991         143 :         assert(!isTempTable(t));
    1992         143 :         obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data));
    1993         143 :         storage *bat = ZNEW(storage);
    1994         143 :         if(!bat)
    1995             :                 return NULL;
    1996         143 :         bat->cs.refcnt = 1;
    1997         143 :         dup_storage(tr, obat, bat, clear || isTempTable(t) /* for clear and temp create empty storage */);
    1998         143 :         bat->cs.ts = tr->tid;
    1999             :         /* only one writer else abort */
    2000         143 :         bat->next = obat;
    2001         143 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2002           0 :                 bat->next = NULL;
    2003           0 :                 destroy_storage(bat);
    2004           0 :                 return NULL;
    2005             :         }
    2006             :         return bat;
    2007             : }
    2008             : 
    2009             : static int
    2010       71906 : delete_tab(sql_trans *tr, sql_table * t, void *ib, int tpe)
    2011             : {
    2012             :         int ok = LOG_OK;
    2013             :         BAT *b = ib;
    2014             :         storage *bat;
    2015             : 
    2016       71906 :         if (tpe == TYPE_bat && !BATcount(b))
    2017             :                 return ok;
    2018             : 
    2019       71872 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2020             :                 return LOG_ERR;
    2021             : 
    2022       71872 :         if (tpe == TYPE_bat)
    2023         295 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2024             :         else
    2025       71577 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2026             :         return ok;
    2027             : }
    2028             : 
    2029             : static size_t
    2030           0 : dcount_col(sql_trans *tr, sql_column *c)
    2031             : {
    2032             :         sql_delta *b;
    2033             : 
    2034           0 :         if (!isTable(c->t))
    2035             :                 return 0;
    2036           0 :         b = col_timestamp_delta(tr, c);
    2037           0 :         if (!b)
    2038             :                 return 1;
    2039             : 
    2040           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2041           0 :         if (!s || !s->segs->t)
    2042             :                 return 1;
    2043           0 :         size_t cnt = s->segs->t->end;
    2044           0 :         if (cnt) {
    2045           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2046             :                 size_t dcnt = 0;
    2047             : 
    2048           0 :                 if (v)
    2049           0 :                         dcnt = BATguess_uniques(v, NULL);
    2050           0 :                 return dcnt;
    2051             :         }
    2052             :         return cnt;
    2053             : }
    2054             : 
    2055             : static size_t
    2056             : count_segs(segment *s)
    2057             : {
    2058             :         size_t nr = 0;
    2059             : 
    2060           0 :         for( ; s; s = s->next)
    2061           0 :                 nr++;
    2062             :         return nr;
    2063             : }
    2064             : 
    2065             : static size_t
    2066           0 : count_del(sql_trans *tr, sql_table *t, int access)
    2067             : {
    2068             :         storage *d;
    2069             : 
    2070           0 :         if (!isTable(t))
    2071             :                 return 0;
    2072           0 :         d = tab_timestamp_storage(tr, t);
    2073           0 :         if (!d)
    2074             :                 return 0;
    2075           0 :         if (access == 2)
    2076           0 :                 return d->cs.ucnt;
    2077           0 :         if (access == 1)
    2078           0 :                 return count_inserts(d->segs->h, tr);
    2079           0 :         if (access == 10) /* special case for counting the number of segments */
    2080           0 :                 return count_segs(d->segs->h);
    2081           0 :         return count_deletes(d->segs->h, tr);
    2082             : }
    2083             : 
    2084             : static int
    2085      110941 : sorted_col(sql_trans *tr, sql_column *col)
    2086             : {
    2087             :         int sorted = 0;
    2088             : 
    2089      110941 :         assert(tr->active);
    2090      110941 :         if (!isTable(col->t) || !col->t->s)
    2091             :                 return 0;
    2092             : 
    2093      110941 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2094      110667 :                 BAT *b = bind_col(tr, col, QUICK);
    2095             : 
    2096      110667 :                 if (b)
    2097      110667 :                         sorted = BATtordered(b) || BATtrevordered(b);
    2098             :         }
    2099             :         return sorted;
    2100             : }
    2101             : 
    2102             : static int
    2103       38781 : unique_col(sql_trans *tr, sql_column *col)
    2104             : {
    2105             :         int distinct = 0;
    2106             : 
    2107       38781 :         assert(tr->active);
    2108       38781 :         if (!isTable(col->t) || !col->t->s)
    2109             :                 return 0;
    2110             : 
    2111       38781 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2112       38507 :                 BAT *b = bind_col(tr, col, QUICK);
    2113             : 
    2114       38507 :                 if (b)
    2115       38507 :                         distinct = b->tkey;
    2116             :         }
    2117             :         return distinct;
    2118             : }
    2119             : 
    2120             : static int
    2121        7972 : double_elim_col(sql_trans *tr, sql_column *col)
    2122             : {
    2123             :         int de = 0;
    2124             : 
    2125        7972 :         assert(tr->active);
    2126        7972 :         if (!isTable(col->t) || !col->t->s)
    2127             :                 return 0;
    2128             : 
    2129        7972 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2130        7706 :                 BAT *b = bind_col(tr, col, QUICK);
    2131             : 
    2132        7706 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2133        7706 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2134        7706 :                         if (de)
    2135        7049 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2136             :                 }
    2137        7706 :                 assert(de >= 0 && de <= 16);
    2138             :         }
    2139             :         return de;
    2140             : }
    2141             : 
    2142             : static int
    2143       20529 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    2144             : {
    2145       20529 :         sqlstore *store = tr->store;
    2146       20529 :         int bid = logger_find_bat(store->logger, id);
    2147       20529 :         if (!bid)
    2148             :                 return LOG_ERR;
    2149       20529 :         cs->bid = temp_dup(bid);
    2150       20529 :         cs->ucnt = 0;
    2151       20529 :         cs->uibid = e_bat(TYPE_oid);
    2152       20529 :         cs->uvbid = e_bat(type);
    2153       20529 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    2154           0 :                 return LOG_ERR;
    2155             :         return LOG_OK;
    2156             : }
    2157             : 
    2158             : static int
    2159       57763 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    2160             : {
    2161             :         int res = LOG_OK;
    2162             :         gdk_return ok;
    2163       57763 :         BAT *b = temp_descriptor(bat->cs.bid);
    2164             : 
    2165       57763 :         if (b == NULL)
    2166             :                 return LOG_ERR;
    2167             : 
    2168       57763 :         if (!bat->cs.uibid)
    2169       57747 :                 bat->cs.uibid = e_bat(TYPE_oid);
    2170       57763 :         if (!bat->cs.uvbid)
    2171       57747 :                 bat->cs.uvbid = e_bat(b->ttype);
    2172       57763 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    2173             :                 res = LOG_ERR;
    2174       57763 :         if (GDKinmemory(0)) {
    2175         175 :                 bat_destroy(b);
    2176         175 :                 return res;
    2177             :         }
    2178             : 
    2179       57588 :         bat_set_access(b, BAT_READ);
    2180       57588 :         sqlstore *store = tr->store;
    2181       57588 :         ok = log_bat_persists(store->logger, b, id);
    2182       57588 :         bat_destroy(b);
    2183       57588 :         if(res != LOG_OK)
    2184             :                 return res;
    2185       57588 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    2186             : }
    2187             : 
    2188             : static int
    2189             : new_persistent_delta( sql_delta *bat)
    2190             : {
    2191           0 :         bat->cs.ucnt = 0;
    2192             :         return LOG_OK;
    2193             : }
    2194             : 
    2195             : static void
    2196             : create_delta( sql_delta *d, BAT *b)
    2197             : {
    2198       81269 :         bat_set_access(b, BAT_READ);
    2199      162538 :         d->cs.bid = temp_create(b);
    2200       81269 :         d->cs.uibid = d->cs.uvbid = 0;
    2201       81269 :         d->cs.ucnt = 0;
    2202             : }
    2203             : 
    2204             : static bat
    2205        5991 : copyBat (bat i, int type, oid seq)
    2206             : {
    2207             :         BAT *b, *tb;
    2208             :         bat res;
    2209             : 
    2210        5991 :         if (!i)
    2211             :                 return i;
    2212        5991 :         tb = quick_descriptor(i);
    2213        5991 :         if (tb == NULL)
    2214             :                 return 0;
    2215        5991 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    2216        5991 :         if (b == NULL)
    2217             :                 return 0;
    2218             : 
    2219        5991 :         bat_set_access(b, BAT_READ);
    2220             : 
    2221        5991 :         res = temp_create(b);
    2222        5991 :         bat_destroy(b);
    2223        5991 :         return res;
    2224             : }
    2225             : 
    2226             : static int
    2227       97171 : create_col(sql_trans *tr, sql_column *c)
    2228             : {
    2229             :         int ok = LOG_OK, new = 0;
    2230       97171 :         int type = c->type.type->localtype;
    2231       97171 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    2232             : 
    2233       97171 :         if (!bat) {
    2234             :                 new = 1;
    2235       97171 :                 bat = ZNEW(sql_delta);
    2236       97171 :                 ATOMIC_PTR_SET(&c->data, bat);
    2237       97171 :                 if(!bat)
    2238             :                         return LOG_ERR;
    2239       97171 :                 bat->cs.refcnt = 1;
    2240             :         }
    2241             : 
    2242             :         if (new)
    2243       97171 :                 bat->cs.ts = tr->tid;
    2244             : 
    2245       97171 :         if (!isNew(c) && !isTempTable(c->t)){
    2246       15875 :                 bat->cs.ts = tr->ts;
    2247       15875 :                 return load_cs(tr, &bat->cs, type, c->base.id);
    2248       81296 :         } else if (bat && bat->cs.bid && !isTempTable(c->t)) {
    2249           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    2250             :         } else {
    2251             :                 sql_column *fc = NULL;
    2252             :                 size_t cnt = 0;
    2253             : 
    2254             :                 /* alter ? */
    2255       81296 :                 if (ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    2256       81295 :                         storage *s = ATOMIC_PTR_GET(&fc->t->data);
    2257       81295 :                         cnt = segs_end(s->segs, tr, c->t);
    2258             :                 }
    2259       81296 :                 if (cnt && fc != c) {
    2260          27 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    2261             : 
    2262          27 :                         if (d->cs.bid) {
    2263          27 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    2264          27 :                                 if(bat->cs.bid == BID_NIL)
    2265             :                                         ok = LOG_ERR;
    2266             :                         }
    2267          27 :                         if (d->cs.uibid) {
    2268          17 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    2269          17 :                                 if (bat->cs.uibid == BID_NIL)
    2270             :                                         ok = LOG_ERR;
    2271             :                         }
    2272          27 :                         if (d->cs.uvbid) {
    2273          17 :                                 bat->cs.uvbid = e_bat(type);
    2274          17 :                                 if(bat->cs.uvbid == BID_NIL)
    2275             :                                         ok = LOG_ERR;
    2276             :                         }
    2277          27 :                         bat->cs.alter = 1;
    2278             :                 } else {
    2279       81269 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    2280       81269 :                         if (!b) {
    2281             :                                 ok = LOG_ERR;
    2282             :                         } else {
    2283       81269 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    2284       81269 :                                 bat_destroy(b);
    2285             :                         }
    2286             : 
    2287       81269 :                         if (!new) {
    2288           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    2289           0 :                                 if (bat->cs.uibid == BID_NIL)
    2290             :                                         ok = LOG_ERR;
    2291           0 :                                 bat->cs.uvbid = e_bat(type);
    2292           0 :                                 if(bat->cs.uvbid == BID_NIL)
    2293             :                                         ok = LOG_ERR;
    2294             :                         }
    2295             :                 }
    2296       81296 :                 bat->cs.ucnt = 0;
    2297             : 
    2298       81296 :                 if (new /*&& !isTempTable(c->t)*/ && !isNew(c->t) /* alter */)
    2299          94 :                         trans_add(tr, &c->base, bat, &tc_gc_col, &commit_create_col, isTempTable(c->t)?NULL:&log_create_col);
    2300             :         }
    2301             :         return ok;
    2302             : }
    2303             : 
    2304             : static int
    2305       52504 : log_create_col_(sql_trans *tr, sql_column *c)
    2306             : {
    2307       52504 :         assert(!isTempTable(c->t));
    2308       52504 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    2309             : }
    2310             : 
    2311             : static int
    2312          90 : log_create_col(sql_trans *tr, sql_change *change)
    2313             : {
    2314          90 :         return log_create_col_(tr, (sql_column*)change->obj);
    2315             : }
    2316             : 
    2317             : static int
    2318       52527 : commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest)
    2319             : {
    2320             :         int ok = LOG_OK;
    2321             :         (void)oldest;
    2322             : 
    2323       52527 :         if(!isTempTable(c->t)) {
    2324       52526 :                 sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    2325       52526 :                 assert(delta->cs.ts == tr->tid);
    2326       52526 :                 delta->cs.ts = commit_ts;
    2327             : 
    2328       52526 :                 assert(delta->next == NULL);
    2329       52526 :                 if (!delta->cs.alter && !delta->cs.merged)
    2330       52507 :                         ok = merge_delta(delta);
    2331       52526 :                 delta->cs.alter = 0;
    2332       52526 :                 if (!tr->parent)
    2333       52523 :                         c->base.new = 0;
    2334             :         }
    2335       52527 :         return ok;
    2336             : }
    2337             : 
    2338             : static int
    2339          94 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    2340             : {
    2341          94 :         sql_column *c = (sql_column*)change->obj;
    2342          94 :         if (!tr->parent)
    2343          93 :                 c->base.new = 0;
    2344          94 :         return commit_create_col_( tr, c, commit_ts, oldest);
    2345             : }
    2346             : 
    2347             : /* will be called for new idx's and when new index columns are created */
    2348             : static int
    2349        7443 : create_idx(sql_trans *tr, sql_idx *ni)
    2350             : {
    2351             :         int ok = LOG_OK, new = 0;
    2352        7443 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    2353             :         int type = TYPE_lng;
    2354             : 
    2355        7443 :         if (oid_index(ni->type))
    2356             :                 type = TYPE_oid;
    2357             : 
    2358        7443 :         if (!bat) {
    2359             :                 new = 1;
    2360        7443 :                 bat = ZNEW(sql_delta);
    2361        7443 :                 ATOMIC_PTR_SET(&ni->data, bat);
    2362        7443 :                 if(!bat)
    2363             :                         return LOG_ERR;
    2364        7443 :                 bat->cs.refcnt = 1;
    2365             :         }
    2366             : 
    2367             :         if (new)
    2368        7443 :                 bat->cs.ts = tr->tid;
    2369             : 
    2370        7443 :         if (!isNew(ni) && !isTempTable(ni->t)){
    2371        1479 :                 bat->cs.ts = 1;
    2372        1479 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    2373        5964 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    2374           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    2375             :         } else {
    2376        5964 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    2377        5964 :                 sql_delta *d = col_timestamp_delta(tr, c);
    2378             : 
    2379        5964 :                 if (d) {
    2380             :                         /* Here we also handle indices created through alter stmts */
    2381             :                         /* These need to be created aligned to the existing data */
    2382        5964 :                         if (d->cs.bid) {
    2383        5964 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    2384        5964 :                                 if(bat->cs.bid == BID_NIL)
    2385             :                                         ok = LOG_ERR;
    2386             :                         }
    2387             :                 } else {
    2388             :                         ok = LOG_ERR;
    2389             :                 }
    2390             : 
    2391        5964 :                 bat->cs.ucnt = 0;
    2392        5964 :                 if (!isNew(c))
    2393         614 :                         bat->cs.alter = 1;
    2394             : 
    2395        5964 :                 if (!new) {
    2396           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    2397           0 :                         if (bat->cs.uibid == BID_NIL)
    2398             :                                 ok = LOG_ERR;
    2399           0 :                         bat->cs.uvbid = e_bat(type);
    2400           0 :                         if(bat->cs.uvbid == BID_NIL)
    2401             :                                 ok = LOG_ERR;
    2402             :                 }
    2403        5964 :                 bat->cs.ucnt = 0;
    2404        5964 :                 if (new && !isNew(ni->t) /* alter */)
    2405         614 :                         trans_add(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, isTempTable(ni->t)?NULL:&log_create_idx);
    2406             :         }
    2407             :         return ok;
    2408             : }
    2409             : 
    2410             : static int
    2411        5259 : log_create_idx_(sql_trans *tr, sql_idx *i)
    2412             : {
    2413        5259 :         assert(!isTempTable(i->t));
    2414        5259 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    2415             : }
    2416             : 
    2417             : static int
    2418         600 : log_create_idx(sql_trans *tr, sql_change *change)
    2419             : {
    2420         600 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    2421             : }
    2422             : 
    2423             : static int
    2424        5281 : commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest)
    2425             : {
    2426             :         int ok = LOG_OK;
    2427             :         (void)oldest;
    2428             : 
    2429        5281 :         if(!isTempTable(i->t)) {
    2430        5281 :                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    2431        5281 :                 assert(delta->cs.ts == tr->tid);
    2432        5281 :                 delta->cs.ts = commit_ts;
    2433             : 
    2434        5281 :                 assert(delta->next == NULL);
    2435        5281 :                 if (!delta->cs.alter && !delta->cs.merged)
    2436        4667 :                         ok = merge_delta(delta);
    2437        5281 :                 if (!tr->parent)
    2438        5280 :                         i->base.new = 0;
    2439             :         }
    2440        5281 :         return ok;
    2441             : }
    2442             : 
    2443             : static int
    2444         614 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    2445             : {
    2446         614 :         sql_idx *i = (sql_idx*)change->obj;
    2447         614 :         if (!tr->parent)
    2448         614 :                 i->base.new = 0;
    2449         614 :         return commit_create_idx_(tr, i, commit_ts, oldest);
    2450             : }
    2451             : 
    2452             : static int
    2453        3175 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    2454             : {
    2455        3175 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    2456             :         BAT *b = NULL, *ib = NULL;
    2457             : 
    2458        3175 :         if (ok != LOG_OK)
    2459             :                 return ok;
    2460        3175 :         if (!(b = temp_descriptor(s->cs.bid)))
    2461             :                 return LOG_ERR;
    2462             :         ib = b;
    2463             : 
    2464        3175 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    2465           0 :                 bat_destroy(ib);
    2466           0 :                 return LOG_ERR;
    2467             :         }
    2468             : 
    2469        3175 :         if (BATcount(b)) {
    2470         287 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib))))
    2471             :                         ok = LOG_ERR;
    2472         435 :                 if (BATtdense(b)) {
    2473             :                         size_t start = b->tseqbase;
    2474         148 :                         size_t cnt = BATcount(b);
    2475         148 :                         ok = delete_range(tr, t, s, start, cnt);
    2476             :                 } else {
    2477         139 :                         assert(BATtordered(b));
    2478         139 :                         BUN icnt = BATcount(b);
    2479         139 :                         BATiter bi = bat_iterator(b);
    2480         139 :                         oid *o = bi.base, n = o[0]+1;
    2481             :                         size_t lcnt = 1;
    2482      278498 :                         for (size_t i=1; i<icnt; i++) {
    2483      278359 :                                 if (o[i] == n) {
    2484      276485 :                                         lcnt++;
    2485      276485 :                                         n++;
    2486             :                                 } else {
    2487        1874 :                                         if ((ok = delete_range(tr, t, s, n-lcnt, lcnt)) != LOG_OK)
    2488             :                                                 break;
    2489             :                                         lcnt = 0;
    2490             :                                 }
    2491      278359 :                                 if (!lcnt) {
    2492        1874 :                                         n = o[i]+1;
    2493             :                                         lcnt = 1;
    2494             :                                 }
    2495             :                         }
    2496         139 :                         if (lcnt && ok == LOG_OK)
    2497         139 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    2498         139 :                         bat_iterator_end(&bi);
    2499             :                 }
    2500         287 :                 if (ok == LOG_OK)
    2501        4826 :                         for (segment *seg = s->segs->h; seg; seg = seg->next)
    2502        4539 :                                 if (seg->ts == tr->tid)
    2503        2381 :                                         seg->ts = 1;
    2504             :         } else {
    2505             :                 if (ok == LOG_OK) {
    2506        2888 :                         BAT *bb = quick_descriptor(s->cs.bid);
    2507             : 
    2508        2888 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    2509             :                                 ok = LOG_ERR;
    2510             :                         } else {
    2511        2888 :                                 segment *seg = s->segs->h;
    2512        2888 :                                 if (seg->ts == tr->tid)
    2513        2888 :                                         seg->ts = 1;
    2514             :                         }
    2515             :                 }
    2516             :         }
    2517        3175 :         if (b != ib)
    2518        3175 :                 bat_destroy(b);
    2519        3175 :         bat_destroy(ib);
    2520             : 
    2521        3175 :         return ok;
    2522             : }
    2523             : 
    2524             : static int
    2525       16509 : create_del(sql_trans *tr, sql_table *t)
    2526             : {
    2527             :         int ok = LOG_OK, new = 0;
    2528             :         BAT *b;
    2529       16509 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    2530             : 
    2531       16509 :         if (!bat) {
    2532             :                 new = 1;
    2533       16509 :                 bat = ZNEW(storage);
    2534       16509 :                 if(!bat)
    2535             :                         return LOG_ERR;
    2536       16509 :                 ATOMIC_PTR_SET(&t->data, bat);
    2537       16509 :                 bat->cs.refcnt = 1;
    2538       16509 :                 bat->cs.ts = tr->tid;
    2539             :         }
    2540             : 
    2541       16509 :         if (!isNew(t) && !isTempTable(t)) {
    2542        3175 :                 bat->cs.ts = tr->ts;
    2543        3175 :                 return load_storage(tr, t, bat, t->base.id);
    2544       13334 :         } else if (bat->cs.bid && !isTempTable(t)) {
    2545             :                 return ok;
    2546       13334 :         } else if (!bat->cs.bid) {
    2547       13334 :                 assert(!bat->segs);
    2548       13334 :                 if (!(bat->segs = new_segments(tr, 0)))
    2549             :                         ok = LOG_ERR;
    2550             : 
    2551       13334 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    2552       13334 :                 if(b != NULL) {
    2553       13334 :                         bat_set_access(b, BAT_READ);
    2554       13334 :                         bat->cs.bid = temp_create(b);
    2555       13334 :                         bat_destroy(b);
    2556             :                 } else {
    2557             :                         ok = LOG_ERR;
    2558             :                 }
    2559       13334 :                 if (new)
    2560       14980 :                         trans_add(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t)?NULL:&log_create_del);
    2561             :         }
    2562             :         return ok;
    2563             : }
    2564             : 
    2565             : static int
    2566      105263 : log_segment(sql_trans *tr, segment *s, sqlid id)
    2567             : {
    2568      105263 :         sqlstore *store = tr->store;
    2569      105263 :         msk m = s->deleted;
    2570      105263 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    2571             : }
    2572             : 
    2573             : static int
    2574       45315 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    2575             : {
    2576             :         /* log segments */
    2577      224789 :         for (segment *seg = segs->h; seg; seg=seg->next) {
    2578      179474 :                 if (seg->ts == tr->tid) {
    2579      105263 :                         if (log_segment(tr, seg, id) != LOG_OK)
    2580             :                                 return LOG_ERR;
    2581             :                 }
    2582             :         }
    2583             :         return LOG_OK;
    2584             : }
    2585             : 
    2586             : static int
    2587       10108 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    2588             : {
    2589             :         BAT *b;
    2590             :         int ok = LOG_OK;
    2591             : 
    2592       10108 :         if (GDKinmemory(0))
    2593             :                 return LOG_OK;
    2594             : 
    2595       10077 :         b = temp_descriptor(bat->cs.bid);
    2596       10077 :         if (b == NULL)
    2597             :                 return LOG_ERR;
    2598             : 
    2599       10077 :         sqlstore *store = tr->store;
    2600       10077 :         bat_set_access(b, BAT_READ);
    2601             :         if (ok == LOG_OK)
    2602       10077 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    2603             :         if (ok == LOG_OK)
    2604       10077 :                 ok = log_segments(tr, bat->segs, t->base.id);
    2605       10077 :         bat_destroy(b);
    2606       10077 :         return ok;
    2607             : }
    2608             : 
    2609             : static int
    2610       10121 : log_create_del(sql_trans *tr, sql_change *change)
    2611             : {
    2612             :         int ok = LOG_OK;
    2613       10121 :         sql_table *t = (sql_table*)change->obj;
    2614             : 
    2615       10121 :         if (t->base.deleted)
    2616             :                 return ok;
    2617       10108 :         assert(!isTempTable(t));
    2618       10108 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    2619       10108 :         if (ok == LOG_OK) {
    2620       62522 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    2621       52414 :                         sql_column *c = n->data;
    2622             : 
    2623       52414 :                         ok = log_create_col_(tr, c);
    2624             :                 }
    2625       10108 :                 if (t->idxs) {
    2626       14775 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    2627        4667 :                                 sql_idx *i = n->data;
    2628             : 
    2629        4667 :                                 if (ATOMIC_PTR_GET(&i->data))
    2630        4659 :                                         ok = log_create_idx_(tr, i);
    2631             :                         }
    2632             :                 }
    2633             :         }
    2634             :         return ok;
    2635             : }
    2636             : 
    2637             : static int
    2638       13336 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    2639             : {
    2640             :         int ok = LOG_OK;
    2641       13336 :         sql_table *t = (sql_table*)change->obj;
    2642             : 
    2643       13336 :         if (!commit_ts) /* rollback handled by ? */
    2644             :                 return ok;
    2645       11752 :         if(!isTempTable(t)) {
    2646       10123 :                 storage *dbat = ATOMIC_PTR_GET(&t->data);
    2647       10123 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    2648       10123 :                 assert(ok == LOG_OK);
    2649             :                 if (ok != LOG_OK)
    2650             :                         return ok;
    2651       10123 :                 merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    2652       10123 :                 assert(dbat->cs.ts == tr->tid);
    2653       10123 :                 dbat->cs.ts = commit_ts;
    2654             :                 if (ok == LOG_OK) {
    2655       62556 :                         for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    2656       52433 :                                 sql_column *c = n->data;
    2657             : 
    2658       52433 :                                 ok = commit_create_col_(tr, c, commit_ts, oldest);
    2659             :                         }
    2660       10123 :                         if (t->idxs) {
    2661       14798 :                                 for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    2662        4675 :                                         sql_idx *i = n->data;
    2663             : 
    2664        4675 :                                         if (ATOMIC_PTR_GET(&i->data))
    2665        4667 :                                                 ok = commit_create_idx_(tr, i, commit_ts, oldest);
    2666             :                                 }
    2667             :                         }
    2668       10123 :                         if (!tr->parent)
    2669       10121 :                                 t->base.new = 0;
    2670             :                 }
    2671             :         }
    2672       11752 :         if (!tr->parent)
    2673       11750 :                 t->base.new = 0;
    2674             :         return ok;
    2675             : }
    2676             : 
    2677             : static int
    2678       16999 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    2679             : {
    2680             :         gdk_return ok = GDK_SUCCEED;
    2681             : 
    2682       16999 :         sqlstore *store = tr->store;
    2683       16999 :         if (!GDKinmemory(0) && b && b->cs.bid)
    2684       16996 :                 ok = log_bat_transient(store->logger, id);
    2685       16996 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    2686             : }
    2687             : 
    2688             : static int
    2689      104710 : destroy_col(sqlstore *store, sql_column *c)
    2690             : {
    2691             :         (void)store;
    2692             :         int ok = LOG_OK;
    2693      104710 :         if (ATOMIC_PTR_GET(&c->data))
    2694      104710 :                 ok = destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    2695      104710 :         ATOMIC_PTR_SET(&c->data, NULL);
    2696      104710 :         return ok;
    2697             : }
    2698             : 
    2699             : static int
    2700       15297 : log_destroy_col_(sql_trans *tr, sql_column *c)
    2701             : {
    2702             :         int ok = LOG_OK;
    2703       15297 :         assert(!isTempTable(c->t));
    2704       15297 :         if (!tr->parent) /* don't write save point commits */
    2705       15297 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    2706       15297 :         return ok;
    2707             : }
    2708             : 
    2709             : static int
    2710          50 : log_destroy_col(sql_trans *tr, sql_change *change)
    2711             : {
    2712          50 :         sql_column *c = (sql_column*)change->obj;
    2713          50 :         int res = log_destroy_col_(tr, c);
    2714          50 :         change->obj = NULL;
    2715          50 :         column_destroy(tr->store, c);
    2716          50 :         return res;
    2717             : }
    2718             : 
    2719             : static int
    2720        8771 : destroy_idx(sqlstore *store, sql_idx *i)
    2721             : {
    2722             :         (void)store;
    2723             :         int ok = LOG_OK;
    2724        8771 :         if (ATOMIC_PTR_GET(&i->data))
    2725        8771 :                 ok = destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    2726        8771 :         ATOMIC_PTR_SET(&i->data, NULL);
    2727        8771 :         return ok;
    2728             : }
    2729             : 
    2730             : static int
    2731        1762 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    2732             : {
    2733             :         int ok = LOG_OK;
    2734        1762 :         assert(!isTempTable(i->t));
    2735        1762 :         if (ATOMIC_PTR_GET(&i->data)) {
    2736        1702 :                 if (!tr->parent) /* don't write save point commits */
    2737        1702 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    2738             :         }
    2739        1762 :         return ok;
    2740             : }
    2741             : 
    2742             : static int
    2743         493 : log_destroy_idx(sql_trans *tr, sql_change *change)
    2744             : {
    2745         493 :         sql_idx *i = (sql_idx*)change->obj;
    2746         493 :         int res = log_destroy_idx_(tr, i);
    2747         493 :         change->obj = NULL;
    2748         493 :         idx_destroy(tr->store, i);
    2749         493 :         return res;
    2750             : }
    2751             : 
    2752             : static int
    2753       18085 : destroy_del(sqlstore *store, sql_table *t)
    2754             : {
    2755             :         (void)store;
    2756             :         int ok = LOG_OK;
    2757       18085 :         if (ATOMIC_PTR_GET(&t->data))
    2758       16492 :                 ok = destroy_storage(ATOMIC_PTR_GET(&t->data));
    2759       18085 :         ATOMIC_PTR_SET(&t->data, NULL);
    2760       18085 :         return ok;
    2761             : }
    2762             : 
    2763             : static int
    2764        2793 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    2765             : {
    2766             :         gdk_return ok = GDK_SUCCEED;
    2767             : 
    2768        2793 :         sqlstore *store = tr->store;
    2769        2793 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    2770        2793 :             bat && bat->cs.bid)
    2771        2793 :                 ok = log_bat_transient(store->logger, id);
    2772        2793 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    2773             : }
    2774             : 
    2775             : static int
    2776        2793 : log_destroy_del(sql_trans *tr, sql_change *change)
    2777             : {
    2778             :         int ok = LOG_OK;
    2779        2793 :         sql_table *t = (sql_table*)change->obj;
    2780             : 
    2781        2793 :         assert(!isTempTable(t));
    2782        2793 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    2783        2793 :         if (ok == LOG_OK) {
    2784       18040 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    2785       15247 :                         sql_column *c = n->data;
    2786             : 
    2787       15247 :                         ok = log_destroy_col_(tr, c);
    2788             :                 }
    2789        2793 :                 if (t->idxs) {
    2790        4062 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    2791        1269 :                                 sql_idx *i = n->data;
    2792             : 
    2793        1269 :                                 ok = log_destroy_idx_(tr, i);
    2794             :                         }
    2795             :                 }
    2796             :         }
    2797        2793 :         return ok;
    2798             : }
    2799             : 
    2800             : static int
    2801        3348 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    2802             : {
    2803             :         (void)tr;
    2804             :         (void)change;
    2805             :         (void)commit_ts;
    2806             :         (void)oldest;
    2807        3348 :         return 0;
    2808             : }
    2809             : 
    2810             : static int
    2811        2826 : drop_del(sql_trans *tr, sql_table *t)
    2812             : {
    2813             :         int ok = LOG_OK;
    2814             : 
    2815        2826 :         if (!isNew(t) && !isTempTable(t)) {
    2816        2799 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    2817        2799 :                 trans_add(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, &log_destroy_del);
    2818             :         }
    2819        2826 :         return ok;
    2820             : }
    2821             : 
    2822             : static int
    2823          55 : drop_col(sql_trans *tr, sql_column *c)
    2824             : {
    2825          55 :         assert(!isNew(c) && !isTempTable(c->t));
    2826          55 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2827          55 :         trans_add(tr, &c->base, d, &tc_gc_drop_col, &commit_destroy_del, &log_destroy_col);
    2828          55 :         return LOG_OK;
    2829             : }
    2830             : 
    2831             : static int
    2832         494 : drop_idx(sql_trans *tr, sql_idx *i)
    2833             : {
    2834         494 :         assert(!isNew(i) && !isTempTable(i->t));
    2835         494 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    2836         494 :         trans_add(tr, &i->base, d, &tc_gc_drop_idx, &commit_destroy_del, &log_destroy_idx);
    2837         494 :         return LOG_OK;
    2838             : }
    2839             : 
    2840             : 
    2841             : static BUN
    2842        1092 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    2843             : {
    2844             :         BAT *b;
    2845             :         BUN sz = 0;
    2846             : 
    2847             :         (void)tr;
    2848        1092 :         if (cs->bid && renew) {
    2849        1024 :                 b = quick_descriptor(cs->bid);
    2850        1024 :                 if (b) {
    2851        1024 :                         sz += BATcount(b);
    2852        1024 :                         bat bid = cs->bid;
    2853        1024 :                         cs->bid = temp_copy(bid, true, temp); /* create empty copy */
    2854        1024 :                         temp_destroy(bid);
    2855             :                 }
    2856             :         }
    2857        1092 :         if (cs->uibid) {
    2858         957 :                 b = temp_descriptor(cs->uibid);
    2859         957 :                 if (b && !isEbat(b)) {
    2860           1 :                         bat_clear(b);
    2861           1 :                         BATcommit(b, BUN_NONE);
    2862             :                 }
    2863         957 :                 bat_destroy(b);
    2864             :         }
    2865        1092 :         if (cs->uvbid) {
    2866         957 :                 b = temp_descriptor(cs->uvbid);
    2867         957 :                 if(b && !isEbat(b)) {
    2868           1 :                         bat_clear(b);
    2869           1 :                         BATcommit(b, BUN_NONE);
    2870             :                 }
    2871         957 :                 bat_destroy(b);
    2872             :         }
    2873        1092 :         cs->cleared = 1;
    2874        1092 :         cs->ucnt = 0;
    2875        1092 :         return sz;
    2876             : }
    2877             : 
    2878             : static BUN
    2879         505 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    2880             : {
    2881         505 :         bool update_conflict = false;
    2882         505 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2883             : 
    2884         572 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    2885           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2886         505 :         if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t)) && isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
    2887         439 :                 trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, isTempTable(c->t)?NULL:&log_update_col);
    2888             :         if (delta)
    2889         505 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    2890             :         return 0;
    2891             : }
    2892             : 
    2893             : static BUN
    2894          25 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    2895             : {
    2896          25 :         bool update_conflict = false;
    2897          25 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2898             : 
    2899          25 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    2900          18 :                 return 0;
    2901           8 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    2902           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2903           7 :         if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t)) && isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
    2904           6 :                 trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, isTempTable(i->t)?NULL:&log_update_idx);
    2905             :         if (delta)
    2906           7 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    2907             :         return 0;
    2908             : }
    2909             : 
    2910             : static int
    2911         165 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    2912             : {
    2913         165 :         clear_cs(tr, &s->cs, true, isTempTable(t));
    2914         165 :         s->cs.cleared = 1;
    2915         165 :         if (s->segs)
    2916         165 :                 destroy_segments(s->segs);
    2917         165 :         if (!(s->segs = new_segments(tr, 0)))
    2918           0 :                 return LOG_ERR;
    2919             :         return LOG_OK;
    2920             : }
    2921             : 
    2922             : 
    2923             : /*
    2924             :  * Clear the table, in general this means replacing the storage,
    2925             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    2926             :  * all segments as deleted.
    2927             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    2928             :  */
    2929             : static BUN
    2930         181 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    2931             : {
    2932         181 :         int clear = !in_transaction || isTempTable(t), ok = LOG_OK;
    2933         181 :         bool conflict = false;
    2934             :         storage *bat;
    2935             : 
    2936         181 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    2937           1 :                 return conflict?BUN_NONE-1:BUN_NONE;
    2938             : 
    2939         180 :         if (!clear) {
    2940          36 :                 lock_table(tr->store, t->base.id);
    2941          36 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    2942          36 :                 unlock_table(tr->store, t->base.id);
    2943             :         }
    2944         180 :         if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || (!isNew(t) && isLocalTemp(t)))
    2945         145 :                 trans_add(tr, &t->base, bat, &tc_gc_del, &commit_update_del, isTempTable(t)?NULL:&log_update_del);
    2946         180 :         if (clear && ok == LOG_OK)
    2947         144 :                 return clear_storage(tr, t, bat);
    2948          36 :         if (ok == LOG_ERR)
    2949             :                 return BUN_NONE;
    2950          36 :         if (ok == LOG_CONFLICT)
    2951           0 :                 return BUN_NONE - 1;
    2952             :         return LOG_OK;
    2953             : }
    2954             : 
    2955             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    2956             : static BUN
    2957         181 : clear_table(sql_trans *tr, sql_table *t)
    2958             : {
    2959         181 :         int in_transaction = segments_in_transaction(tr, t);
    2960         181 :         int clear = !in_transaction || isTempTable(t);
    2961             : 
    2962         181 :         node *n = ol_first_node(t->columns);
    2963         181 :         sql_column *c = n->data;
    2964         181 :         storage *d = tab_timestamp_storage(tr, t);
    2965             : 
    2966         181 :         if (!d)
    2967             :                 return BUN_NONE;
    2968         181 :         BUN sz = count_col(tr, c, CNT_ACTIVE), clear_ok;
    2969         181 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    2970             :                 return clear_ok;
    2971             : 
    2972         685 :         for (; n; n = n->next) {
    2973         505 :                 c = n->data;
    2974             : 
    2975         505 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    2976           0 :                         return clear_ok;
    2977             :         }
    2978         180 :         if (t->idxs) {
    2979         207 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    2980          27 :                         sql_idx *ci = n->data;
    2981             : 
    2982          52 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    2983          25 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    2984           0 :                                 return clear_ok;
    2985             :                 }
    2986             :         }
    2987             :         return sz;
    2988             : }
    2989             : 
    2990             : static int
    2991        3500 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    2992             : {
    2993        3500 :         sqlstore *store = tr->store;
    2994             :         gdk_return ok = GDK_SUCCEED;
    2995             : 
    2996             :         (void) t;
    2997             :         (void) segs;
    2998        3500 :         if (GDKinmemory(0))
    2999             :                 return LOG_OK;
    3000             : 
    3001             :         /*
    3002             :         if (cs->cleared && log_bat_clear(store->logger, id) != GDK_SUCCEED)
    3003             :                 return LOG_ERR;
    3004             :                 */
    3005             : 
    3006        3493 :         if (cs->cleared) {
    3007         586 :                 assert(cs->ucnt == 0);
    3008         586 :                 BAT *ins = temp_descriptor(cs->bid);
    3009         586 :                 if (!ins)
    3010             :                         return LOG_ERR;
    3011         586 :                 assert(!isEbat(ins));
    3012         586 :                 bat_set_access(ins, BAT_READ);
    3013         586 :                 ok = log_bat_persists(store->logger, ins, id);
    3014         586 :                 bat_destroy(ins);
    3015         586 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3016             :         }
    3017             : 
    3018        2907 :         assert(!isTempTable(t));
    3019             : 
    3020        2907 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3021        2452 :                 BAT *ui = temp_descriptor(cs->uibid);
    3022        2452 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3023             :                 /* any updates */
    3024        2452 :                 if (ui == NULL || uv == NULL) {
    3025             :                         ok = GDK_FAIL;
    3026        2452 :                 } else if (BUNlast(uv) > uv->batInserted || BATdirty(uv))
    3027        2452 :                         ok = log_delta(store->logger, ui, uv, id);
    3028        2452 :                 bat_destroy(ui);
    3029        2452 :                 bat_destroy(uv);
    3030             :         }
    3031        2452 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3032             : }
    3033             : 
    3034             : static int
    3035             : tr_log_delta( sql_trans *tr, sql_table *t, sql_delta *cbat, segment *segs, sqlid id)
    3036             : {
    3037        3357 :         return tr_log_cs( tr, t, &cbat->cs, segs, id);
    3038             : }
    3039             : 
    3040             : static int
    3041       35095 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3042             : {
    3043       35095 :         sqlstore *store = tr->store;
    3044             :         gdk_return ok = GDK_SUCCEED;
    3045             : 
    3046       35095 :         if (isTempTable(t))
    3047             :                 return LOG_OK;
    3048       35095 :         size_t end = segs_end(segs, tr, t);
    3049      204333 :         for (segment *cur = segs->h; cur && ok; cur = cur->next) {
    3050      169238 :                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    3051      176660 :                         for (node *n = ol_first_node(t->columns); n && ok; n = n->next) {
    3052      151409 :                                 sql_column *c = n->data;
    3053      151409 :                                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    3054             : 
    3055             :                                 /* append col*/
    3056      151409 :                                 BAT *ins = temp_descriptor(cs->bid);
    3057      151409 :                                 assert(ins);
    3058      151409 :                                 assert(BATcount(ins) >= cur->end);
    3059      151409 :                                 ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start);
    3060      151409 :                                 bat_destroy(ins);
    3061             :                         }
    3062       25251 :                         if (t->idxs) {
    3063       28362 :                                 for (node *n = ol_first_node(t->idxs); n && ok; n = n->next) {
    3064        3111 :                                         sql_idx *i = n->data;
    3065             : 
    3066        3111 :                                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3067        2391 :                                                 continue;
    3068         720 :                                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    3069             : 
    3070         720 :                                         if (cs) {
    3071             :                                                 /* append idx */
    3072         720 :                                                 BAT *ins = temp_descriptor(cs->bid);
    3073         720 :                                                 assert(ins);
    3074         720 :                                                 assert(BATcount(ins) >= cur->end);
    3075         720 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start);
    3076         720 :                                                 bat_destroy(ins);
    3077             :                                         }
    3078             :                                 }
    3079             :                         }
    3080             :                 }
    3081             :         }
    3082       35095 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3083             : }
    3084             : 
    3085             : static int
    3086       35238 : log_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3087             : {
    3088       35238 :         int ok = LOG_OK, cleared = s->cs.cleared;
    3089       35238 :         if (ok == LOG_OK && cleared)
    3090         143 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    3091         143 :         if (ok == LOG_OK)
    3092       35238 :                 ok = log_segments(tr, s->segs, id);
    3093       35238 :         if (ok == LOG_OK && !cleared)
    3094       35095 :                 ok = log_table_append(tr, t, s->segs);
    3095       35238 :         return ok;
    3096             : }
    3097             : 
    3098             : static int
    3099       99626 : merge_cs( column_storage *cs)
    3100             : {
    3101             :         int ok = LOG_OK;
    3102             :         BAT *cur = NULL;
    3103             : 
    3104       99626 :         if (cs->bid) {
    3105       99626 :                 cur = temp_descriptor(cs->bid);
    3106       99626 :                 if(!cur)
    3107             :                         return LOG_ERR;
    3108             :         }
    3109             : 
    3110       99626 :         if (cs->ucnt) {
    3111        2457 :                 BAT *ui = temp_descriptor(cs->uibid);
    3112        2457 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3113             : 
    3114        2457 :                 if(!ui || !uv) {
    3115           0 :                         bat_destroy(ui);
    3116           0 :                         bat_destroy(uv);
    3117           0 :                         bat_destroy(cur);
    3118           0 :                         return LOG_ERR;
    3119             :                 }
    3120        2457 :                 assert(BATcount(ui) == BATcount(uv));
    3121             : 
    3122             :                 /* any updates */
    3123        2457 :                 assert(!isEbat(cur));
    3124        2457 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    3125           0 :                         bat_destroy(ui);
    3126           0 :                         bat_destroy(uv);
    3127           0 :                         bat_destroy(cur);
    3128           0 :                         return LOG_ERR;
    3129             :                 }
    3130             :                 /* cleanup the old deltas */
    3131        2457 :                 temp_destroy(cs->uibid);
    3132        2457 :                 temp_destroy(cs->uvbid);
    3133        2457 :                 cs->uibid = e_bat(TYPE_oid);
    3134        2457 :                 cs->uvbid = e_bat(cur->ttype);
    3135        2457 :                 if(cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3136             :                         ok = LOG_ERR;
    3137        2457 :                 cs->ucnt = 0;
    3138        2457 :                 bat_destroy(ui);
    3139        2457 :                 bat_destroy(uv);
    3140             :         }
    3141       99626 :         cs->cleared = 0;
    3142       99626 :         cs->merged = 1;
    3143       99626 :         bat_destroy(cur);
    3144       99626 :         return ok;
    3145             : }
    3146             : 
    3147             : static int
    3148       64576 : merge_delta( sql_delta *obat)
    3149             : {
    3150             :         int ok = LOG_OK;
    3151             : 
    3152       64576 :         if (obat && obat->next && !obat->cs.merged && (ok = merge_delta(obat->next)) != LOG_OK)
    3153             :                 return ok;
    3154       64576 :         return merge_cs(&obat->cs);
    3155             : }
    3156             : 
    3157             : static int
    3158       35050 : merge_storage(storage *tdb)
    3159             : {
    3160       35050 :         int ok = merge_cs(&tdb->cs);
    3161             : 
    3162       35050 :         if (tdb->next) {
    3163         141 :                 ok = destroy_storage(tdb->next);
    3164         141 :                 tdb->next = NULL;
    3165             :         }
    3166       35050 :         return ok;
    3167             : }
    3168             : 
    3169             : static sql_delta *
    3170           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    3171             : {
    3172             :         /* commit ie copy back to the parent transaction */
    3173           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    3174             :                 sql_delta *od = delta->next;
    3175           1 :                 if (od->cs.ts == commit_ts) {
    3176           0 :                         sql_delta t = *od, *n = od->next;
    3177           0 :                         *od = *delta;
    3178           0 :                         od->next = n;
    3179           0 :                         *delta = t;
    3180           0 :                         delta->next = NULL;
    3181           0 :                         destroy_delta(delta, true);
    3182             :                         return od;
    3183             :                 }
    3184             :         }
    3185             :         return delta;
    3186             : }
    3187             : 
    3188             : static int
    3189           0 : rollback_delta(sql_trans *tr, sql_delta *delta, int type)
    3190             : {
    3191             :         (void)tr;
    3192           0 :         if (delta->cs.ucnt) {
    3193           0 :                 delta->cs.ucnt = 0;
    3194           0 :                 temp_destroy(delta->cs.uibid);
    3195           0 :                 temp_destroy(delta->cs.uvbid);
    3196           0 :                 delta->cs.uibid = e_bat(TYPE_oid);
    3197           0 :                 delta->cs.uvbid = e_bat(type);
    3198           0 :                 if (delta->cs.uibid == BID_NIL || delta->cs.uvbid == BID_NIL)
    3199           0 :                         return LOG_ERR;
    3200             :         }
    3201             :         return LOG_OK;
    3202             : }
    3203             : 
    3204             : static int
    3205        3330 : log_update_col( sql_trans *tr, sql_change *change)
    3206             : {
    3207        3330 :         sql_column *c = (sql_column*)change->obj;
    3208             : 
    3209        3330 :         if (!isTempTable(c->t) && !tr->parent) {/* don't write save point commits */
    3210        3330 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    3211        3330 :                 return tr_log_delta(tr, c->t, ATOMIC_PTR_GET(&c->data), s->segs->h, c->base.id);
    3212             :         }
    3213             :         return LOG_OK;
    3214             : }
    3215             : 
    3216             : static int
    3217        1228 : commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest)
    3218             : {
    3219             :         int ok = LOG_OK;
    3220        1228 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3221             : 
    3222             :         (void)oldest;
    3223        1228 :         if (isTempTable(c->t)) {
    3224        1228 :                 if (commit_ts) { /* commit */
    3225         870 :                         if (c->t->commit_action == CA_COMMIT || c->t->commit_action == CA_PRESERVE) {
    3226         813 :                                 if (!delta->cs.merged)
    3227         678 :                                         ok = merge_delta(delta);
    3228             :                         } else /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    3229          57 :                                 clear_cs(tr, &delta->cs, true, isTempTable(c->t));
    3230             :                 } else { /* rollback */
    3231         358 :                         if (c->t->commit_action == CA_COMMIT/* || c->t->commit_action == CA_PRESERVE*/)
    3232           0 :                                 ok = rollback_delta(tr, delta, c->type.type->localtype);
    3233             :                         else /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    3234         358 :                                 clear_cs(tr, &delta->cs, true, isTempTable(c->t));
    3235             :                 }
    3236        1228 :                 if (!tr->parent)
    3237        1228 :                         c->t->base.new = c->base.new = 0;
    3238             :         }
    3239        1228 :         return ok;
    3240             : }
    3241             : 
    3242             : static int
    3243         170 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    3244             : {
    3245             :         sqlstore *store = Store;
    3246             : 
    3247         170 :         sql_delta *d = (sql_delta*)change->data;
    3248         170 :         if (d->cs.ts < oldest) {
    3249          81 :                 destroy_delta(d, false);
    3250          81 :                 return 1;
    3251             :         }
    3252          89 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    3253          81 :                 d->cs.ts = store_get_timestamp(store) + 1;
    3254             :         return 0;
    3255             : }
    3256             : 
    3257             : static int
    3258           2 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    3259             : {
    3260             :         sqlstore *store = Store;
    3261             : 
    3262           2 :         storage *d = (storage*)change->data;
    3263           2 :         if (d->cs.ts < oldest) {
    3264           1 :                 destroy_storage(d);
    3265           1 :                 return 1;
    3266             :         }
    3267           1 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    3268           1 :                 d->cs.ts = store_get_timestamp(store) + 1;
    3269             :         return 0;
    3270             : }
    3271             : 
    3272             : 
    3273             : static int
    3274        4638 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3275             : {
    3276             :         int ok = LOG_OK;
    3277        4638 :         sql_column *c = (sql_column*)change->obj;
    3278        4638 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3279             : 
    3280        4638 :         if (isTempTable(c->t))
    3281        1228 :                 return commit_update_col_(tr, c, commit_ts, oldest);
    3282        3410 :         if (commit_ts)
    3283        3331 :                 delta->cs.ts = commit_ts;
    3284        3410 :         if (!commit_ts) { /* rollback */
    3285          79 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(&c->data);
    3286             : 
    3287          79 :                 if (change->ts && c->t->base.new) /* handled by create col */
    3288             :                         return ok;
    3289          79 :                 if (o != d) {
    3290           0 :                         while(o && o->next != d)
    3291             :                                 o = o->next;
    3292             :                 }
    3293          79 :                 if (o == ATOMIC_PTR_GET(&c->data))
    3294          79 :                         ATOMIC_PTR_SET(&c->data, d->next);
    3295             :                 else
    3296           0 :                         o->next = d->next;
    3297          79 :                 change->cleanup = &tc_gc_rollbacked;
    3298        3331 :         } else if (ok == LOG_OK && !tr->parent) {
    3299             :                 /* merge deltas */
    3300        3341 :                 while (delta && delta->cs.ts > oldest)
    3301          11 :                         delta = delta->next;
    3302        3330 :                 if (ok == LOG_OK && delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    3303        3329 :                         lock_column(tr->store, c->base.id); /* lock for concurrent updates (appends) */
    3304        3329 :                         ok = merge_delta(delta);
    3305        3329 :                         unlock_column(tr->store, c->base.id);
    3306             :                 }
    3307             :         } else if (ok == LOG_OK && tr->parent) /* move delta into older and cleanup current save points */
    3308           1 :                 ATOMIC_PTR_SET(&c->data, savepoint_commit_delta(delta, commit_ts));
    3309             :         return ok;
    3310             : }
    3311             : 
    3312             : static int
    3313          27 : log_update_idx( sql_trans *tr, sql_change *change)
    3314             : {
    3315          27 :         sql_idx *i = (sql_idx*)change->obj;
    3316             : 
    3317          27 :         if (!isTempTable(i->t) && !tr->parent) { /* don't write save point commits */
    3318          27 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    3319          27 :                 return tr_log_delta(tr, i->t, ATOMIC_PTR_GET(&i->data), s->segs->h, i->base.id);
    3320             :         }
    3321             :         return LOG_OK;
    3322             : }
    3323             : 
    3324             : static int
    3325           0 : commit_update_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest)
    3326             : {
    3327             :         int ok = LOG_OK;
    3328           0 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3329           0 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    3330             : 
    3331             :         (void)oldest;
    3332           0 :         if (isTempTable(i->t)) {
    3333           0 :                 if (commit_ts) { /* commit */
    3334           0 :                         if (i->t->commit_action == CA_COMMIT || i->t->commit_action == CA_PRESERVE) {
    3335           0 :                                 if (!delta->cs.merged)
    3336           0 :                                         ok = merge_delta(delta);
    3337             :                         } else /* CA_DELETE as CA_DROP's are gone already */
    3338           0 :                                 clear_cs(tr, &delta->cs, true, isTempTable(i->t));
    3339             :                 } else { /* rollback */
    3340           0 :                         if (i->t->commit_action == CA_COMMIT/* || i->t->commit_action == CA_PRESERVE*/)
    3341           0 :                                 ok = rollback_delta(tr, delta, type);
    3342             :                         else /* CA_DELETE as CA_DROP's are gone already */
    3343           0 :                                 clear_cs(tr, &delta->cs, true, isTempTable(i->t));
    3344             :                 }
    3345           0 :                 if (!tr->parent)
    3346           0 :                         i->t->base.new = i->base.new = 0;
    3347             :         }
    3348           0 :         return ok;
    3349             : }
    3350             : 
    3351             : static int
    3352          29 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3353             : {
    3354             :         int ok = LOG_OK;
    3355          29 :         sql_idx *i = (sql_idx*)change->obj;
    3356          29 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3357             : 
    3358          29 :         if (isTempTable(i->t))
    3359           0 :                 return commit_update_idx_( tr, i, commit_ts, oldest);
    3360          29 :         if (commit_ts)
    3361          27 :                 delta->cs.ts = commit_ts;
    3362          29 :         if (!commit_ts) { /* rollback */
    3363           2 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(&i->data);
    3364             : 
    3365           2 :                 if (change->ts && i->t->base.new) /* handled by create col */
    3366             :                         return ok;
    3367           2 :                 if (o != d) {
    3368           0 :                         while(o && o->next != d)
    3369             :                                 o = o->next;
    3370             :                 }
    3371           2 :                 if (o == ATOMIC_PTR_GET(&i->data))
    3372           2 :                         ATOMIC_PTR_SET(&i->data, d->next);
    3373             :                 else
    3374           0 :                         o->next = d->next;
    3375           2 :                 change->cleanup = &tc_gc_rollbacked;
    3376          27 :         } else if (ok == LOG_OK && !tr->parent) {
    3377             :                 /* merge deltas */
    3378          27 :                 while (delta && delta->cs.ts > oldest)
    3379           0 :                         delta = delta->next;
    3380          27 :                 if (ok == LOG_OK && delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    3381          27 :                         lock_column(tr->store, i->base.id); /* lock for concurrent updates (appends) */
    3382          27 :                         ok = merge_delta(delta);
    3383          27 :                         unlock_column(tr->store, i->base.id);
    3384             :                 }
    3385             :         } else if (ok == LOG_OK && tr->parent) /* cleanup older save points */
    3386           0 :                 ATOMIC_PTR_SET(&i->data, savepoint_commit_delta(delta, commit_ts));
    3387             :         return ok;
    3388             : }
    3389             : 
    3390             : static storage *
    3391          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    3392             : {
    3393          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    3394           0 :                 assert(0);
    3395             :                 storage *od = dbat->next;
    3396             :                 if (od->cs.ts == commit_ts) {
    3397             :                         storage t = *od, *n = od->next;
    3398             :                         *od = *dbat;
    3399             :                         od->next = n;
    3400             :                         *dbat = t;
    3401             :                         dbat->next = NULL;
    3402             :                         destroy_storage(dbat);
    3403             :                         return od;
    3404             :                 }
    3405             :         }
    3406             :         return dbat;
    3407             : }
    3408             : 
    3409             : static int
    3410       35238 : log_update_del( sql_trans *tr, sql_change *change)
    3411             : {
    3412       35238 :         sql_table *t = (sql_table*)change->obj;
    3413             : 
    3414       35238 :         if (!isTempTable(t) && !tr->parent) /* don't write save point commits */
    3415       35238 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data), t->base.id);
    3416             :         return LOG_OK;
    3417             : }
    3418             : 
    3419             : static int
    3420             : rollback_storage(sql_trans *tr, storage *dbat)
    3421             : {
    3422             :         (void)tr;
    3423             :         (void)dbat;
    3424             :         return LOG_OK;
    3425             : }
    3426             : 
    3427             : static int
    3428             : commit_storage(sql_trans *tr, storage *dbat)
    3429             : {
    3430             :         (void)tr;
    3431             :         (void)dbat;
    3432             :         return LOG_OK;
    3433             : }
    3434             : 
    3435             : static int
    3436       38362 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3437             : {
    3438             :         int ok = LOG_OK;
    3439       38362 :         sql_table *t = (sql_table*)change->obj;
    3440       38362 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3441             : 
    3442       38362 :         if (isTempTable(t)) {
    3443          36 :                 if (!(dbat = temp_tab_timestamp_storage(tr, t)))
    3444             :                         return LOG_ERR;
    3445          36 :                 if (commit_ts) { /* commit */
    3446          36 :                         if (t->commit_action == CA_COMMIT || t->commit_action == CA_PRESERVE)
    3447             :                                 ok = commit_storage(tr, dbat);
    3448             :                         else /* CA_DELETE as CA_DROP's are gone already */
    3449          21 :                                 ok = clear_storage(tr, t, dbat);
    3450             :                 } else { /* rollback */
    3451           0 :                         if (t->commit_action == CA_COMMIT/* || t->commit_action == CA_PRESERVE*/)
    3452             :                                 ok = rollback_storage(tr, dbat);
    3453             :                         else /* CA_DELETE as CA_DROP's are gone already */
    3454           0 :                                 ok = clear_storage(tr, t, dbat);
    3455             :                 }
    3456          36 :                 t->base.new = 0;
    3457          36 :                 return ok;
    3458             :         }
    3459       38326 :         lock_table(tr->store, t->base.id);
    3460       38326 :         if (!commit_ts) { /* rollback */
    3461        3062 :                 if (dbat->cs.ts == tr->tid) {
    3462           4 :                         if (change->ts && t->base.new) { /* handled by the create table */
    3463           3 :                                 unlock_table(tr->store, t->base.id);
    3464           3 :                                 return ok;
    3465             :                         }
    3466           1 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    3467             : 
    3468           1 :                         if (o != d) {
    3469           0 :                                 while(o && o->next != d)
    3470             :                                         o = o->next;
    3471             :                         }
    3472           1 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    3473           1 :                                 assert(d->next);
    3474           1 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    3475             :                         } else
    3476           0 :                                 o->next = d->next;
    3477           1 :                         d->next = NULL;
    3478           1 :                         change->cleanup = &tc_gc_rollbacked_storage;
    3479             :                 } else
    3480        3058 :                         rollback_segments(dbat->segs, tr, change, oldest);
    3481       35264 :         } else if (ok == LOG_OK && !tr->parent) {
    3482             :                 storage *d = dbat;
    3483       35238 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    3484         142 :                         dbat->cs.ts = commit_ts;
    3485             : 
    3486       35238 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3487       35238 :                 assert(ok == LOG_OK);
    3488             :                 if (ok == LOG_OK)
    3489       35238 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    3490       35238 :                 if (ok == LOG_OK && dbat == d && oldest == commit_ts)
    3491       35050 :                         ok = merge_storage(dbat);
    3492             :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    3493          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    3494          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    3495             :         }
    3496       38323 :         unlock_table(tr->store, t->base.id);
    3497       38323 :         return ok;
    3498             : }
    3499             : 
    3500             : static int
    3501        3357 : gc_delta( sql_store Store, sql_change *change, ulng oldest)
    3502             : {
    3503             :         sqlstore *store = Store;
    3504        3357 :         sql_delta *n = change->data;
    3505             :         (void)store;
    3506             :         (void)oldest;
    3507             : 
    3508        3357 :         destroy_delta(n, true);
    3509        3357 :         return 1;
    3510             : }
    3511             : 
    3512             : /* only rollback (content version) case for now */
    3513             : static int
    3514        4726 : gc_col( sqlstore *store, sql_change *change, ulng oldest, bool cleanup)
    3515             : {
    3516        4726 :         sql_column *c = (sql_column*)change->obj;
    3517             : 
    3518        4726 :         if (!c) /* cleaned earlier */
    3519             :                 return 1;
    3520             : 
    3521             :         /* savepoint commit (did it merge ?) */
    3522        4676 :         if (ATOMIC_PTR_GET(&c->data) != change->data || isTempTable(c->t)) /* data is freed by commit */
    3523             :                 return 1;
    3524        3447 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    3525             :                 return 0;
    3526             :         sql_delta *d = (sql_delta*)change->data;
    3527        3445 :         if (d->next) {
    3528             :                 int ok = LOG_OK;
    3529             : 
    3530        3348 :                 assert(!cleanup);
    3531        3348 :                 if (d->cs.ts > oldest)
    3532             :                         return ok; /* cannot cleanup yet */
    3533             : 
    3534             :                 sql_delta *n = d->next;
    3535        3330 :                 if (n->cs.ucnt && !n->cs.merged) {
    3536           0 :                         lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    3537           0 :                         ok = merge_delta(n);
    3538           0 :                         unlock_column(store, c->base.id);
    3539        3330 :                 } else if (d && d->cs.ucnt && !d->cs.merged) {
    3540          10 :                         lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    3541          10 :                         ok = merge_delta(d);
    3542          10 :                         unlock_column(store, c->base.id);
    3543             :                 }
    3544        3330 :                 d->next = NULL;
    3545        3330 :                 change->cleanup = &gc_delta;
    3546        3330 :                 change->data = n;
    3547        3330 :                 return ok;
    3548             :         }
    3549          97 :         if (cleanup)
    3550           5 :                 column_destroy(store, c);
    3551             :         return 1;
    3552             : }
    3553             : 
    3554             : static int
    3555        4671 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    3556             : {
    3557        4671 :         return gc_col(Store, change, oldest, false);
    3558             : }
    3559             : 
    3560             : /* only rollback (content version) case for now */
    3561             : static int
    3562          55 : tc_gc_drop_col( sql_store Store, sql_change *change, ulng oldest)
    3563             : {
    3564          55 :         return gc_col(Store, change, oldest, true);
    3565             : }
    3566             : 
    3567             : static int
    3568        1135 : gc_idx( sqlstore *store, sql_change *change, ulng oldest, bool cleanup)
    3569             : {
    3570        1135 :         sql_idx *i = (sql_idx*)change->obj;
    3571             : 
    3572        1135 :         if (!i) /* cleaned earlier */
    3573             :                 return 1;
    3574             : 
    3575             :         /* savepoint commit (did it merge ?) */
    3576         642 :         if (ATOMIC_PTR_GET(&i->data) != change->data || isTempTable(i->t)) /* data is freed by commit */
    3577             :                 return 1;
    3578         642 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    3579             :                 return 0;
    3580             :         sql_delta *d = (sql_delta*)change->data;
    3581         642 :         if (d->next) {
    3582             :                 int ok = LOG_OK;
    3583             : 
    3584          27 :                 assert(!cleanup);
    3585          27 :                 if (d->cs.ts > oldest)
    3586             :                         return ok; /* cannot cleanup yet */
    3587             : 
    3588             :                 sql_delta *n = d->next;
    3589          27 :                 if (n->cs.ucnt && !n->cs.merged) {
    3590           0 :                         lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    3591           0 :                         ok = merge_delta(n);
    3592           0 :                         unlock_column(store, i->base.id);
    3593          27 :                 } else if (d && d->cs.ucnt && !d->cs.merged) {
    3594           0 :                         lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    3595           0 :                         ok = merge_delta(d);
    3596           0 :                         unlock_column(store, i->base.id);
    3597             :                 }
    3598          27 :                 d->next = NULL;
    3599          27 :                 change->cleanup = &gc_delta;
    3600          27 :                 change->data = n;
    3601          27 :                 return ok;
    3602             :         }
    3603         615 :         if (cleanup)
    3604           1 :                 idx_destroy(store, i);
    3605             :         return 1;
    3606             : }
    3607             : 
    3608             : static int
    3609         641 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    3610             : {
    3611         641 :         return gc_idx(Store, change, oldest, false);
    3612             : }
    3613             : 
    3614             : static int
    3615         494 : tc_gc_drop_idx( sql_store Store, sql_change *change, ulng oldest)
    3616             : {
    3617         494 :         return gc_idx(Store, change, oldest, true);
    3618             : }
    3619             : 
    3620             : 
    3621             : static int
    3622       51860 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    3623             : {
    3624             :         sqlstore *store = Store;
    3625       51860 :         sql_table *t = (sql_table*)change->obj;
    3626             : 
    3627             :         (void)store;
    3628             :         /* savepoint commit (did it merge ?) */
    3629       51860 :         if (ATOMIC_PTR_GET(&t->data) != change->data || isTempTable(t)) /* data is freed by commit */
    3630             :                 return 1;
    3631       50178 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    3632             :                 return 0;
    3633             :         storage *d = (storage*)change->data;
    3634       50150 :         if (d->next) {
    3635           2 :                 if (d->cs.ts > oldest)
    3636             :                         return LOG_OK; /* cannot cleanup yet */
    3637             : 
    3638           1 :                 destroy_storage(d->next);
    3639           1 :                 d->next = NULL;
    3640             :         }
    3641             :         return 1;
    3642             : }
    3643             : 
    3644             : static int
    3645        3569 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    3646             : {
    3647        3569 :         if (nr == 0)
    3648             :                 return LOG_OK;
    3649             :         assert (nr > 0);
    3650        3569 :         if ((!offsets || !*offsets) && nr == total) {
    3651        3551 :                 *offset = slot;
    3652        3551 :                 return LOG_OK;
    3653             :         }
    3654          18 :         if (!*offsets) {
    3655           9 :                 *offsets = COLnew(0, TYPE_oid, total, TRANSIENT);
    3656           9 :                 if (!*offsets)
    3657             :                         return LOG_ERR;
    3658             :         }
    3659          18 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    3660         209 :         for(size_t i = 0; i < nr; i++)
    3661         191 :                 dst[i] = slot + i;
    3662          18 :         (*offsets)->batCount += nr;
    3663          18 :         (*offsets)->theap->dirty = true;
    3664          18 :         return LOG_OK;
    3665             : }
    3666             : 
    3667             : static int
    3668        3560 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    3669             : {
    3670        3560 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    3671        3560 :         assert(s->segs);
    3672        3560 :         ulng oldest = store_oldest(tr->store);
    3673             :         BUN slot = 0;
    3674             :         size_t total = cnt;
    3675             : 
    3676        3560 :         if (!locked)
    3677        3560 :                 lock_table(tr->store, t->base.id);
    3678             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    3679        7231 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = seg->next) {
    3680        3674 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* re-use old deleted or rolledback append */
    3681          23 :                         if ((seg->end - seg->start) >= cnt) {
    3682             :                                 /* if previous is claimed before we could simply adjust the end/start */
    3683          14 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    3684           3 :                                         slot = p->end;
    3685           3 :                                         p->end += cnt;
    3686           3 :                                         seg->start += cnt;
    3687           3 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    3688             :                                                 ok = LOG_ERR;
    3689             :                                                 break;
    3690             :                                         }
    3691             :                                         cnt = 0;
    3692           3 :                                         break;
    3693             :                                 }
    3694             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    3695             :                                 size_t rcnt = seg->end - seg->start;
    3696             :                                 if (rcnt > cnt)
    3697             :                                         rcnt = cnt;
    3698          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    3699             :                                         ok = LOG_ERR;
    3700             :                                         break;
    3701             :                                 }
    3702             :                         }
    3703          20 :                         seg->ts = tr->tid;
    3704          20 :                         seg->deleted = false;
    3705          20 :                         slot = seg->start;
    3706          20 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    3707             :                                 ok = LOG_ERR;
    3708             :                                 break;
    3709             :                         }
    3710          20 :                         cnt -= (seg->end - seg->start);
    3711             :                 }
    3712             :         }
    3713        3560 :         if (ok == LOG_OK && cnt) {
    3714        3546 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    3715        2652 :                         slot = s->segs->t->end;
    3716        2652 :                         s->segs->t->end += cnt;
    3717             :                 } else {
    3718         894 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    3719             :                                 ok = LOG_ERR;
    3720             :                         } else {
    3721         894 :                                 if (!s->segs->h)
    3722           0 :                                         s->segs->h = s->segs->t;
    3723         894 :                                 slot = s->segs->t->start;
    3724             :                         }
    3725             :                 }
    3726        3546 :                 ok = add_offsets(slot, cnt, total, offset, offsets);
    3727             :         }
    3728        3560 :         if (!locked)
    3729        3560 :                 unlock_table(tr->store, t->base.id);
    3730             : 
    3731             :         /* hard to only add this once per transaction (probably want to change to once per new segment) */
    3732        3560 :         if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || (!isNew(t) && isLocalTemp(t))) {
    3733         880 :                 trans_add(tr, &t->base, s, &tc_gc_del, &commit_update_del, isTempTable(t)?NULL:&log_update_del);
    3734             :                 in_transaction = true;
    3735             :         }
    3736        3560 :         if (in_transaction && !isTempTable(t))
    3737        3552 :                 tr->logchanges += (int) total;
    3738        3560 :         if (*offsets) {
    3739             :                 BAT *pos = *offsets;
    3740           9 :                 assert(BATcount(pos) == total);
    3741           9 :                 BATsetcount(pos, total); /* set other properties */
    3742           9 :                 pos->tnil = false;
    3743           9 :                 pos->tnonil = true;
    3744           9 :                 pos->tkey = true;
    3745           9 :                 pos->tsorted = true;
    3746           9 :                 pos->trevsorted = false;
    3747             :         }
    3748        3560 :         return ok;
    3749             : }
    3750             : 
    3751             : static int
    3752     1789615 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    3753             : {
    3754     1789615 :         if (cnt > 1 && offsets)
    3755        3560 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    3756     1786055 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    3757     1786055 :         assert(s->segs);
    3758     1786055 :         ulng oldest = store_oldest(tr->store);
    3759             :         BUN slot = 0;
    3760             :         int reused = 0;
    3761             : 
    3762     1786055 :         if (!locked)
    3763     1704417 :                 lock_table(tr->store, t->base.id);
    3764             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    3765             :          * or create new segment at the end */
    3766     9872437 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = seg->next) {
    3767     8152559 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* re-use old deleted or rolledback append */
    3768             : 
    3769             :                         if ((seg->end - seg->start) >= cnt) {
    3770             : 
    3771             :                                 /* if previous is claimed before we could simply adjust the end/start */
    3772       66177 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    3773       53545 :                                         slot = p->end;
    3774       53545 :                                         p->end += cnt;
    3775       53545 :                                         seg->start += cnt;
    3776             :                                         reused = 1;
    3777       53545 :                                         break;
    3778             :                                 }
    3779             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    3780       12632 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL)
    3781             :                                         ok = LOG_ERR;
    3782             :                         }
    3783       12632 :                         seg->ts = tr->tid;
    3784       12632 :                         seg->deleted = false;
    3785       12632 :                         slot = seg->start;
    3786             :                         reused = 1;
    3787       12632 :                         break;
    3788             :                 }
    3789             :         }
    3790     1786055 :         if (ok == LOG_OK && !reused) {
    3791     1719878 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    3792     1704660 :                         slot = s->segs->t->end;
    3793     1704660 :                         s->segs->t->end += cnt;
    3794             :                 } else {
    3795       15218 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    3796             :                                 ok = LOG_ERR;
    3797             :                         } else {
    3798       15218 :                                 if (!s->segs->h)
    3799           0 :                                         s->segs->h = s->segs->t;
    3800       15218 :                                 slot = s->segs->t->start;
    3801             :                         }
    3802             :                 }
    3803             :         }
    3804     1786055 :         if (!locked)
    3805     1704417 :                 unlock_table(tr->store, t->base.id);
    3806             : 
    3807             :         /* hard to only add this once per transaction (probably want to change to once per new segment) */
    3808     1786055 :         if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) || (!isNew(t) && isLocalTemp(t))) {
    3809       26413 :                 trans_add(tr, &t->base, s, &tc_gc_del, &commit_update_del, isTempTable(t)?NULL:&log_update_del);
    3810             :                 in_transaction = true;
    3811             :         }
    3812     1786055 :         if (in_transaction && !isTempTable(t))
    3813     1785872 :                 tr->logchanges += (int) cnt;
    3814     1786055 :         if (ok == LOG_OK) {
    3815     1786055 :                 *offset = slot;
    3816     1786055 :                 return LOG_OK;
    3817             :         }
    3818             :         return LOG_ERR;
    3819             : }
    3820             : 
    3821             : /*
    3822             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    3823             :  * of the global transaction and mark the newly added storage slots unused on the global
    3824             :  * level but used on the local transaction level. Besides this the local transaction needs
    3825             :  * to update (and mark unused) any slot inbetween the old end and new slots.
    3826             :  * */
    3827             : static int
    3828     1707977 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    3829             : {
    3830             :         storage *s;
    3831             : 
    3832             :         /* we have a single segment structure for each persistent table
    3833             :          * for temporary tables each has its own */
    3834     1707977 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    3835             :                 return LOG_ERR;
    3836             : 
    3837     1707977 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    3838             : }
    3839             : 
    3840             : /* some tables cannot be updated concurrently (user/roles etc) */
    3841             : static int
    3842       81642 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    3843             : {
    3844             :         storage *s;
    3845             :         int res = 0;
    3846             : 
    3847             :         /* we have a single segment structure for each persistent table
    3848             :          * for temporary tables each has its own */
    3849       81642 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    3850             :                 /* TODO check for other inserts ! */
    3851             :                 return LOG_ERR;
    3852             : 
    3853       81642 :         lock_table(tr->store, t->base.id);
    3854       81642 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    3855           4 :                 unlock_table(tr->store, t->base.id);
    3856           4 :                 return LOG_CONFLICT;
    3857             :         }
    3858       81638 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    3859       81638 :         unlock_table(tr->store, t->base.id);
    3860       81638 :         return res;
    3861             : }
    3862             : 
    3863             : static int
    3864        4831 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    3865             : {
    3866             :         storage *s;
    3867             :         int res = 0;
    3868             : 
    3869        4831 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    3870             :                 return LOG_ERR;
    3871             : 
    3872        4831 :         lock_table(tr->store, t->base.id);
    3873        4831 :         res = segments_conflict(tr, s->segs, uncommitted);
    3874        4831 :         unlock_table(tr->store, t->base.id);
    3875        4831 :         return res ? LOG_CONFLICT : LOG_OK;
    3876             : }
    3877             : 
    3878             : static size_t
    3879      876441 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    3880             : {
    3881             :         size_t cnt = 0;
    3882             : 
    3883     1430606 :         for(;s && s->end <= start; s = s->next)
    3884             :                 ;
    3885             : 
    3886     2308035 :         for(;s && s->start < end && !cnt; s = s->next) {
    3887     1431594 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    3888      181059 :                         cnt += s->end - s->start;
    3889             :         }
    3890      876441 :         return cnt;
    3891             : }
    3892             : 
    3893             : static BAT *
    3894      876441 : segments2cands(segment *s, sql_trans *tr, sql_table *t, size_t start, size_t end)
    3895             : {
    3896      876441 :         lock_table(tr->store, t->base.id);
    3897             :         /* step one no deletes -> dense range */
    3898             :         uint32_t cur = 0;
    3899      876441 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    3900      876441 :         if (!dnr) {
    3901      721238 :                 unlock_table(tr->store, t->base.id);
    3902      721238 :                 return BATdense(start, start, end-start);
    3903             :         }
    3904             : 
    3905      155203 :         BAT *b = COLnew(0, TYPE_msk, nr, TRANSIENT), *bn = NULL;
    3906      155203 :         if (!b) {
    3907           0 :                 unlock_table(tr->store, t->base.id);
    3908           0 :                 return NULL;
    3909             :         }
    3910             : 
    3911      155203 :         uint32_t *restrict dst = Tloc(b, 0);
    3912    62993506 :         for( ; s; s=s->next) {
    3913    62918334 :                 if (s->end < start)
    3914      514503 :                         continue;
    3915    62403831 :                 if (s->start >= end)
    3916             :                         break;
    3917    62323800 :                 msk m = (SEG_IS_VALID(s, tr));
    3918    62323800 :                 size_t lnr = s->end-s->start;
    3919    62323800 :                 if (s->start < start)
    3920       34542 :                         lnr -= (start - s->start);
    3921    62323800 :                 if (s->end > end)
    3922       34059 :                         lnr -= s->end - end;
    3923             : 
    3924    62323800 :                 if (m) {
    3925     1303573 :                         size_t used = pos&31, end = 32;
    3926     1303573 :                         if (used) {
    3927     1146166 :                                 if (lnr < (32-used))
    3928      691565 :                                         end = used + lnr;
    3929    10294172 :                                 for(size_t j=used; j < end; j++, pos++, lnr--)
    3930     9148006 :                                         cur |= 1U<<j;
    3931     1146166 :                                 if (end == 32) {
    3932      454601 :                                         *dst++ = cur;
    3933             :                                         cur = 0;
    3934             :                                 }
    3935             :                         }
    3936     1303573 :                         size_t full = lnr/32;
    3937     1303573 :                         size_t rest = lnr%32;
    3938     9069953 :                         for(size_t i = 0; i<full; i++, pos+=32, lnr-=32)
    3939     7766380 :                                 *dst++ = ~0;
    3940     7954488 :                         for(size_t j=0; j < rest; j++, pos++, lnr--)
    3941     6650915 :                                 cur |= 1U<<j;
    3942     1303573 :                         assert(lnr==0);
    3943             :                 } else {
    3944    61020227 :                         size_t used = pos&31, end = 32;
    3945    61020227 :                         if (used) {
    3946    59096349 :                                 if (lnr < (32-used))
    3947    57126254 :                                         end = used + lnr;
    3948             : 
    3949    59096349 :                                 pos+= (end-used);
    3950    59096349 :                                 lnr-= (end-used);
    3951    59096349 :                                 if (end == 32) {
    3952     1970095 :                                         *dst++ = cur;
    3953             :                                         cur = 0;
    3954             :                                 }
    3955             :                         }
    3956    61020227 :                         size_t full = lnr/32;
    3957    61020227 :                         size_t rest = lnr%32;
    3958    62786622 :                         for(size_t i = 0; i<full; i++, pos+=32, lnr-=32)
    3959     1766395 :                                 *dst++ = 0;
    3960    61020227 :                         pos+= rest;
    3961             :                         lnr-= rest;
    3962    61020227 :                         assert(lnr==0);
    3963             :                 }
    3964             :         }
    3965             : 
    3966      155203 :         unlock_table(tr->store, t->base.id);
    3967      155203 :         if (pos%32)
    3968      154142 :                 *dst=cur;
    3969      155203 :         BATsetcount(b, nr);
    3970      155203 :         bn = BATmaskedcands(start, nr, b, true);
    3971      155203 :         BBPreclaim(b);
    3972             :         (void)pos;
    3973      155203 :         assert (pos == nr);
    3974             :         return bn;
    3975             : }
    3976             : 
    3977             : static void *                                   /* BAT * */
    3978      957009 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    3979             : {
    3980             :         /* with nr_of_parts - part_nr we can adjust parts */
    3981      957009 :         storage *s = tab_timestamp_storage(tr, t);
    3982             : 
    3983      957010 :         if (!s)
    3984             :                 return NULL;
    3985      957010 :         size_t nr = segs_end(s->segs, tr, t);
    3986             : 
    3987      957010 :         if (!nr)
    3988       80569 :                 return BATdense(0, 0, 0);
    3989             : 
    3990             :         /* compute proper part */
    3991      876441 :         size_t part_size = nr/nr_of_parts;
    3992      876441 :         size_t start = part_size * part_nr;
    3993      876441 :         size_t end = start + part_size;
    3994      876441 :         if (part_nr == (nr_of_parts-1))
    3995             :                 end = nr;
    3996      876441 :         assert(end <= nr);
    3997      876441 :         return segments2cands(s->segs->h, tr, t, start, end);
    3998             : }
    3999             : 
    4000             : static void
    4001       35318 : temp_del_tab(sql_trans *tr, sql_table *t)
    4002             : {
    4003       35318 :         ulng tid = tr->tid;
    4004       35318 :         lock_table(tr->store, t->base.id);
    4005       35318 :   table_retry:
    4006       39289 :         for (storage *d = ATOMIC_PTR_GET(&t->data), *p = NULL, *n = NULL; d; d = n) {
    4007        3971 :                 n = d->next;
    4008        3971 :                 if (d->cs.ts == tid) {
    4009        3731 :                         if (p == NULL) {
    4010        3726 :                                 if (!ATOMIC_PTR_CAS(&t->data, (void **) &d, n))
    4011           0 :                                         goto table_retry;
    4012             :                         } else {
    4013           5 :                                 p->next = n;
    4014             :                         }
    4015        3731 :                         d->next = NULL;
    4016        3731 :                         destroy_storage(d);
    4017             :                 } else {
    4018             :                         p = d;
    4019             :                 }
    4020             :         }
    4021       35318 :         unlock_table(tr->store, t->base.id);
    4022      282462 :         for (node *nd = t->columns->l->h; nd; nd = nd->next) {
    4023      247144 :                 sql_column *c = nd->data;
    4024      247144 :                 lock_column(tr->store, c->base.id);
    4025      247144 :           column_retry:
    4026      277230 :                 for (sql_delta *d = ATOMIC_PTR_GET(&c->data), *p = NULL, *n = NULL; d; d = n) {
    4027       30086 :                         n = d->next;
    4028       30086 :                         if (d->cs.ts == tid) {
    4029       28072 :                                 if (p == NULL) {
    4030       28066 :                                         if (!ATOMIC_PTR_CAS(&c->data, (void **) &d, n))
    4031           0 :                                                 goto column_retry;
    4032             :                                 } else {
    4033           6 :                                         p->next = n;
    4034             :                                 }
    4035       28072 :                                 d->next = NULL;
    4036       28072 :                                 destroy_delta(d, false);
    4037             :                         } else {
    4038             :                                 p = d;
    4039             :                         }
    4040             :                 }
    4041      247144 :                 unlock_column(tr->store, c->base.id);
    4042             :         }
    4043       35318 : }
    4044             : 
    4045             : static int
    4046           1 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    4047             : {
    4048           1 :         bool update_conflict = false;
    4049           1 :         int in_transaction = segments_in_transaction(tr, col->t);
    4050           1 :         if (in_transaction) return LOG_CONFLICT;
    4051             :         sql_delta *d = NULL;
    4052             :         // old delta
    4053           1 :         sql_delta *odelta = ATOMIC_PTR_GET(&col->data);
    4054             : 
    4055           1 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    4056           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    4057           1 :         assert(d && d->cs.ts == tr->tid);
    4058           1 :         if ((!inTransaction(tr, col->t) && (odelta != d || isTempTable(col->t)) && isGlobal(col->t)) || (!isNew(col->t) && isLocalTemp(col->t)))
    4059           1 :                 trans_add(tr, &col->base, d, &tc_gc_col, &commit_update_col, &log_update_col);
    4060           1 :         if (d->cs.bid)
    4061           1 :                 temp_destroy(d->cs.bid);
    4062           1 :         if (d->cs.uibid)
    4063           1 :                 temp_destroy(d->cs.uibid);
    4064           1 :         if (d->cs.uvbid)
    4065           1 :                 temp_destroy(d->cs.uvbid);
    4066           1 :         d->cs.bid = temp_create(bn);
    4067           1 :         d->cs.uibid = 0;
    4068           1 :         d->cs.uvbid = 0;
    4069           1 :         d->cs.ucnt = 0;
    4070           1 :         d->cs.cleared = 0;
    4071           1 :         d->cs.ts = tr->tid;
    4072           1 :         d->cs.refcnt = 1;
    4073           1 :         return LOG_OK;
    4074             : }
    4075             : 
    4076             : void
    4077         266 : bat_storage_init( store_functions *sf)
    4078             : {
    4079         266 :         sf->bind_col = &bind_col;
    4080         266 :         sf->bind_idx = &bind_idx;
    4081         266 :         sf->bind_cands = &bind_cands;
    4082             : 
    4083         266 :         sf->claim_tab = &claim_tab;
    4084         266 :         sf->key_claim_tab = &key_claim_tab;
    4085         266 :         sf->tab_validate = &tab_validate;
    4086             : 
    4087         266 :         sf->append_col = &append_col;
    4088         266 :         sf->append_idx = &append_idx;
    4089             : 
    4090         266 :         sf->update_col = &update_col;
    4091         266 :         sf->update_idx = &update_idx;
    4092             : 
    4093         266 :         sf->delete_tab = &delete_tab;
    4094             : 
    4095         266 :         sf->count_del = &count_del;
    4096         266 :         sf->count_col = &count_col;
    4097         266 :         sf->count_idx = &count_idx;
    4098         266 :         sf->dcount_col = &dcount_col;
    4099         266 :         sf->sorted_col = &sorted_col;
    4100         266 :         sf->unique_col = &unique_col;
    4101         266 :         sf->double_elim_col = &double_elim_col;
    4102             : 
    4103         266 :         sf->col_dup = &col_dup;
    4104         266 :         sf->idx_dup = &idx_dup;
    4105         266 :         sf->del_dup = &del_dup;
    4106             : 
    4107         266 :         sf->create_col = &create_col;    /* create and add to change list */
    4108         266 :         sf->create_idx = &create_idx;
    4109         266 :         sf->create_del = &create_del;
    4110             : 
    4111         266 :         sf->destroy_col = &destroy_col;  /* free resources */
    4112         266 :         sf->destroy_idx = &destroy_idx;
    4113         266 :         sf->destroy_del = &destroy_del;
    4114             : 
    4115         266 :         sf->drop_col = &drop_col;                /* add drop to change list */
    4116         266 :         sf->drop_idx = &drop_idx;
    4117         266 :         sf->drop_del = &drop_del;
    4118             : 
    4119         266 :         sf->clear_table = &clear_table;
    4120         266 :         sf->temp_del_tab = &temp_del_tab;
    4121         266 :         sf->swap_bats = &swap_bats;
    4122         266 : }
    4123             : 
    4124             : #if 0
    4125             : static lng
    4126             : log_get_nr_inserted(sql_column *fc, lng *offset)
    4127             : {
    4128             :         lng cnt = 0;
    4129             : 
    4130             :         if (!fc || GDKinmemory(0))
    4131             :                 return 0;
    4132             : 
    4133             :         if (fc->base.atime && fc->base.allocated) {
    4134             :                 sql_delta *fb = fc->data;
    4135             :                 BAT *ins = temp_descriptor(fb->cs.bid);
    4136             : 
    4137             :                 if (ins && BUNlast(ins) > 0 && BUNlast(ins) > ins->batInserted) {
    4138             :                         cnt = BUNlast(ins) - ins->batInserted;
    4139             :                 }
    4140             :                 bat_destroy(ins);
    4141             :         }
    4142             :         return cnt;
    4143             : }
    4144             : 
    4145             : static lng
    4146             : log_get_nr_deleted(sql_table *ft, lng *offset)
    4147             : {
    4148             :         lng cnt = 0;
    4149             : 
    4150             :         if (!ft || GDKinmemory(0))
    4151             :                 return 0;
    4152             : 
    4153             :         if (ft->base.atime && ft->base.allocated) {
    4154             :                 storage *fdb = ft->data;
    4155             :                 BAT *db = temp_descriptor(fdb->cs.bid);
    4156             : 
    4157             :                 if (db && BUNlast(db) > 0 && BUNlast(db) > db->batInserted) {
    4158             :                         cnt = BUNlast(db) - db->batInserted;
    4159             :                         *offset = db->batInserted;
    4160             :                 }
    4161             :                 bat_destroy(db);
    4162             :         }
    4163             :         return cnt;
    4164             : }
    4165             : #endif

Generated by: LCOV version 1.14