LCOV - code coverage report
Current view: top level - sql/storage - objectset.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 462 488 94.7 %
Date: 2021-10-13 02:24:04 Functions: 41 43 95.3 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "sql_catalog.h"
      11             : #include "sql_storage.h"
      12             : 
      13             : #include "gdk_atoms.h"
      14             : 
      15             : struct versionhead ;
      16             : 
      17             : #define active                                                  (0)
      18             : #define under_destruction                               (1<<1)
      19             : #define block_destruction                               (1<<2)
      20             : #define deleted                                                 (1<<3)
      21             : #define rollbacked                                              (1<<4)
      22             : 
      23             : /* This objectversion owns its associated versionhead.
      24             :  * When this objectversion gets destroyed,
      25             :  * the cleanup procedure should also destroy the associated (name|id) based versionhead.*/
      26             : #define name_based_versionhead_owner    (1<<5)
      27             : #define id_based_versionhead_owner              (1<<6)
      28             : 
      29             : typedef struct objectversion {
      30             :         ulng ts;
      31             :         ATOMIC_TYPE state;
      32             :         sql_base *b; // base of underlying sql object
      33             :         struct objectset* os;
      34             :         struct objectversion    *name_based_older;
      35             :         struct objectversion    *name_based_newer;
      36             :         struct versionhead              *name_based_head;
      37             : 
      38             :         struct objectversion    *id_based_older;
      39             :         struct objectversion    *id_based_newer;
      40             :         struct versionhead              *id_based_head;
      41             : } objectversion;
      42             : 
      43             : typedef struct versionhead  {
      44             :     struct versionhead * prev;
      45             :     struct versionhead * next;
      46             :     objectversion* ov;
      47             : } versionhead ;
      48             : 
      49             : typedef struct objectset {
      50             :         int refcnt;
      51             :         sql_allocator *sa;
      52             :         destroy_fptr destroy;
      53             :         MT_RWLock rw_lock;      /*readers-writer lock to protect the links (chains) in the objectversion chain.*/
      54             :         versionhead  *name_based_h;
      55             :         versionhead  *name_based_t;
      56             :         versionhead  *id_based_h;
      57             :         versionhead  *id_based_t;
      58             :         int name_based_cnt;
      59             :         int id_based_cnt;
      60             :         struct sql_hash *name_map;
      61             :         struct sql_hash *id_map;
      62             :         bool
      63             :                 temporary:1,
      64             :                 unique:1, /* names are unique */
      65             :                 concurrent:1;   /* concurrent inserts are allowed */
      66             :         sql_store store;
      67             : } objectset;
      68             : 
      69             : static int
      70      191829 : os_id_key(versionhead  *n)
      71             : {
      72      252464 :         return (int) BATatoms[TYPE_int].atomHash(&n->ov->b->id);
      73             : }
      74             : 
      75             : static inline void
      76             : lock_reader(objectset* os)
      77             : {
      78      301629 :         MT_rwlock_rdlock(&os->rw_lock);
      79             : }
      80             : 
      81             : static inline void
      82             : unlock_reader(objectset* os)
      83             : {
      84    16632017 :         MT_rwlock_rdunlock(&os->rw_lock);
      85   333426550 : }
      86             : 
      87             : static inline void
      88             : lock_writer(objectset* os)
      89             : {
      90       16179 :         MT_rwlock_wrlock(&os->rw_lock);
      91             : }
      92             : 
      93             : static inline void
      94             : unlock_writer(objectset* os)
      95             : {
      96           0 :         MT_rwlock_wrunlock(&os->rw_lock);
      97         502 : }
      98             : 
      99             : static bte os_atmc_get_state(objectversion *ov) {
     100    18593155 :         bte state = (bte) ATOMIC_GET(&ov->state);
     101             :         return state;
     102             : }
     103             : 
     104             : static void os_atmc_set_state(objectversion *ov, bte state) {
     105       60673 :         ATOMIC_SET(&ov->state, state);
     106           0 : }
     107             : 
     108             : static versionhead  *
     109      301297 : find_id(objectset *os, sqlid id)
     110             : {
     111      301297 :         if (os) {
     112             :                 lock_reader(os);
     113      301297 :                 if (os->id_map) {
     114      277450 :                         int key = (int) BATatoms[TYPE_int].atomHash(&id);
     115      277450 :                         sql_hash_e *he = os->id_map->buckets[key&(os->id_map->size-1)];
     116             : 
     117     1012499 :                         for (; he; he = he->chain) {
     118      837964 :                                 versionhead  *n = he->value;
     119             : 
     120      837964 :                                 if (n && n->ov->b->id == id) {
     121             :                                         unlock_reader(os);
     122      102915 :                                         return n;
     123             :                                 }
     124             :                         }
     125             :                         unlock_reader(os);
     126      174535 :                         return NULL;
     127             :                 }
     128             : 
     129       77736 :                 for (versionhead  *n = os->id_based_h; n; n = n->next) {
     130       55784 :                         objectversion *ov = n->ov;
     131             : 
     132             :                         /* check if ids match */
     133       55784 :                         if (id == ov->b->id) {
     134             :                                 unlock_reader(os);
     135        1895 :                                 return n;
     136             :                         }
     137             :                 }
     138             :         }
     139             : 
     140             :         unlock_reader(os);
     141       21952 :         return NULL;
     142             : }
     143             : 
     144             : // TODO copy of static function from sql_list.c. Needs to be made external
     145             : static void
     146       27699 : hash_delete(sql_hash *h, void *data)
     147             : {
     148       27699 :         int key = h->key(data);
     149       27699 :         sql_hash_e *e, *p = h->buckets[key&(h->size-1)];
     150             : 
     151             :         e = p;
     152       41450 :         for (;  p && p->value != data ; p = p->chain)
     153             :                 e = p;
     154       27699 :         if (p && p->value == data) {
     155       27699 :                 if (p == e)
     156       23352 :                         h->buckets[key&(h->size-1)] = p->chain;
     157             :                 else
     158        4347 :                         e->chain = p->chain;
     159       27699 :                 if (!h->sa)
     160       27699 :                         _DELETE(p);
     161             :         }
     162       27699 :         h->entries--;
     163       27699 : }
     164             : 
     165             : static void
     166      391211 : node_destroy(objectset *os, sqlstore *store, versionhead  *n)
     167             : {
     168      391211 :         if (!os->sa)
     169      391211 :                 _DELETE(n);
     170             :         (void)store;
     171      391211 : }
     172             : 
     173             : static versionhead  *
     174       14134 : os_remove_name_based_chain(objectset *os, objectversion* ov)
     175             : {
     176             :         lock_writer(os);
     177       14134 :         versionhead  *n = ov->name_based_head;
     178       14134 :         versionhead  *p = os->name_based_h;
     179       14134 :         if (p != n)
     180     1068909 :                 while (p && p->next != n)
     181             :                         p = p->next;
     182       14134 :         assert(p==n||(p && p->next == n));
     183       14134 :         if (p == n) {
     184        1134 :                 os->name_based_h = n->next;
     185        1134 :                 if (os->name_based_h) // i.e. non-empty os
     186         729 :                         os->name_based_h->prev = NULL;
     187             :                 p = NULL;
     188       13000 :         } else if ( p != NULL)  {
     189       13000 :                 p->next = n->next;
     190       13000 :                 if (p->next) // node in the middle
     191        5189 :                         p->next->prev = p;
     192             :         }
     193       14134 :         if (n == os->name_based_t)
     194        8216 :                 os->name_based_t = p;
     195             : 
     196       14134 :         if (os->name_map && n)
     197       13430 :                 hash_delete(os->name_map, n);
     198             : 
     199       14134 :         os->name_based_cnt--;
     200             :         unlock_writer(os);
     201             : 
     202             :         bte state = os_atmc_get_state(ov);
     203       14134 :         state |= name_based_versionhead_owner;
     204             :         os_atmc_set_state(ov, state);
     205       14134 :         return p;
     206             : }
     207             : 
     208             : static versionhead  *
     209       14623 : os_remove_id_based_chain(objectset *os, objectversion* ov)
     210             : {
     211             :         lock_writer(os);
     212       14623 :         versionhead  *n = ov->id_based_head;
     213       14623 :         versionhead  *p = os->id_based_h;
     214             : 
     215       14623 :         if (p != n)
     216     1112463 :                 while (p && p->next != n)
     217             :                         p = p->next;
     218       14623 :         assert(p==n||(p && p->next == n));
     219       14623 :         if (p == n) {
     220        1136 :                 os->id_based_h = n->next;
     221        1136 :                 if (os->id_based_h) // i.e. non-empty os
     222         731 :                         os->id_based_h->prev = NULL;
     223             :                 p = NULL;
     224       13487 :         } else if ( p != NULL)  {
     225       13487 :                 p->next = n->next;
     226       13487 :                 if (p->next) // node in the middle
     227        5677 :                         p->next->prev = p;
     228             :         }
     229       14623 :         if (n == os->id_based_t)
     230        8215 :                 os->id_based_t = p;
     231             : 
     232       14623 :         if (os->id_map && n)
     233       14269 :                 hash_delete(os->id_map, n);
     234             : 
     235       14623 :         os->name_based_cnt--;
     236             :         unlock_writer(os);
     237             : 
     238             :         bte state = os_atmc_get_state(ov);
     239       14623 :         state |= id_based_versionhead_owner;
     240             :         os_atmc_set_state(ov, state);
     241       14623 :         return p;
     242             : }
     243             : 
     244             : static versionhead  *
     245      392253 : node_create(sql_allocator *sa, objectversion *ov)
     246             : {
     247      392253 :         versionhead  *n = SA_NEW(sa, versionhead );
     248             : 
     249      392253 :         if (n == NULL)
     250             :                 return NULL;
     251      392253 :         *n = (versionhead ) {
     252             :                 .ov = ov,
     253             :         };
     254      392253 :         return n;
     255             : }
     256             : 
     257             : static inline int
     258      242857 : os_name_key(versionhead  *n)
     259             : {
     260      242857 :         return hash_key(n->ov->b->name);
     261             : }
     262             : 
     263             : static objectset *
     264      195882 : os_append_node_name(objectset *os, versionhead  *n)
     265             : {
     266             :         lock_writer(os);
     267      195882 :         if ((!os->name_map || os->name_map->size*16 < os->name_based_cnt) && os->name_based_cnt > HASH_MIN_SIZE) {
     268        3330 :                 hash_destroy(os->name_map);
     269        3330 :                 os->name_map = hash_new(os->sa, os->name_based_cnt, (fkeyvalue)& os_name_key);
     270        3330 :                 if (os->name_map == NULL) {
     271             :                         unlock_writer(os);
     272           0 :                         return NULL;
     273             :                 }
     274             : 
     275       56032 :                 for (versionhead  *n = os->name_based_h; n; n = n->next ) {
     276       52702 :                         int key = os_name_key(n);
     277             : 
     278       52702 :                         if (hash_add(os->name_map, key, n) == NULL) {
     279             :                                 unlock_writer(os);
     280           0 :                                 return NULL;
     281             :                         }
     282             :                 }
     283             :         }
     284             : 
     285      195882 :         if (os->name_map) {
     286      176725 :                 int key = os->name_map->key(n);
     287             : 
     288      176725 :                 if (hash_add(os->name_map, key, n) == NULL) {
     289             :                         unlock_writer(os);
     290           0 :                         return NULL;
     291             :                 }
     292             :         }
     293             : 
     294      195882 :         if (os->name_based_t) {
     295      191369 :                 os->name_based_t->next = n;
     296             :         } else {
     297        4513 :                 os->name_based_h = n;
     298             :         }
     299      195882 :         n->prev = os->name_based_t; // aka the double linked list.
     300      195882 :         os->name_based_t = n;
     301      195882 :         os->name_based_cnt++;
     302             :         unlock_writer(os);
     303      195882 :         return os;
     304             : }
     305             : 
     306             : static objectset *
     307      195882 : os_append_name(objectset *os, objectversion *ov)
     308             : {
     309      195882 :         versionhead  *n = node_create(os->sa, ov);
     310             : 
     311      195882 :         if (n == NULL)
     312             :                 return NULL;
     313             : 
     314      195882 :         ov->name_based_head = n;
     315      195882 :         if (!(os = os_append_node_name(os, n))){
     316           0 :                 _DELETE(n);
     317           0 :                 return NULL;
     318             :         }
     319             : 
     320             :         return os;
     321             : }
     322             : 
     323             : static void
     324        3417 : os_append_id_map(objectset *os)
     325             : {
     326        3417 :         if (os->id_map)
     327         360 :                 hash_destroy(os->id_map);
     328        3417 :         os->id_map = hash_new(os->sa, os->id_based_cnt, (fkeyvalue)&os_id_key);
     329        3417 :         if (os->id_map == NULL)
     330             :                 return ;
     331       64052 :         for (versionhead  *n = os->id_based_h; n; n = n->next ) {
     332             :                 int key = os_id_key(n);
     333             : 
     334       60635 :                 if (hash_add(os->id_map, key, n) == NULL) {
     335           0 :                         hash_destroy(os->id_map);
     336           0 :                         os->id_map = NULL;
     337           0 :                         return ;
     338             :                 }
     339             :         }
     340             : }
     341             : 
     342             : static objectset *
     343      196371 : os_append_node_id(objectset *os, versionhead  *n)
     344             : {
     345             :         lock_writer(os);
     346      196371 :         if ((!os->id_map || os->id_map->size*16 < os->id_based_cnt) && os->id_based_cnt > HASH_MIN_SIZE)
     347        3417 :                 os_append_id_map(os); /* on failure just fall back to slow method */
     348             : 
     349      196371 :         if (os->id_map) {
     350      177560 :                 int key = os->id_map->key(n);
     351      177560 :                 if (hash_add(os->id_map, key, n) == NULL) {
     352           0 :                         hash_destroy(os->id_map);
     353           0 :                         os->id_map = NULL;
     354             :                         /* fall back to slow search */
     355             :                 }
     356             :         }
     357             : 
     358      196371 :         if (os->id_based_t) {
     359      191858 :                 os->id_based_t->next = n;
     360             :         } else {
     361        4513 :                 os->id_based_h = n;
     362             :         }
     363      196371 :         n->prev = os->id_based_t; // aka the double linked list.
     364      196371 :         os->id_based_t = n;
     365      196371 :         os->id_based_cnt++;
     366             :         unlock_writer(os);
     367      196371 :         return os;
     368             : }
     369             : 
     370             : static objectset *
     371      196371 : os_append_id(objectset *os, objectversion *ov)
     372             : {
     373      196371 :         versionhead  *n = node_create(os->sa, ov);
     374             : 
     375      196371 :         if (n == NULL)
     376             :                 return NULL;
     377      196371 :         ov->id_based_head = n;
     378      196371 :         if (!(os = os_append_node_id(os, n))){
     379           0 :                 _DELETE(n);
     380           0 :                 return NULL;
     381             :         }
     382             : 
     383             :         return os;
     384             : }
     385             : 
     386             : static versionhead * find_name(objectset *os, const char *name);
     387             : 
     388             : static void
     389      214296 : objectversion_destroy(sqlstore *store, objectset* os, objectversion *ov)
     390             : {
     391             : 
     392             :         bte state = os_atmc_get_state(ov);
     393             : 
     394      214296 :         if (state & name_based_versionhead_owner) {
     395       14134 :                 node_destroy(ov->os, store, ov->name_based_head);
     396             :         }
     397             : 
     398      214296 :         if (state & id_based_versionhead_owner) {
     399       14623 :                 node_destroy(ov->os, store, ov->id_based_head);
     400             :         }
     401             : 
     402      214296 :         if (os->destroy)
     403      214296 :                 os->destroy(store, ov->b);
     404             : 
     405      214296 :         _DELETE(ov);
     406      214296 : }
     407             : 
     408             : static void
     409        5551 : _os_rollback(objectversion *ov, sqlstore *store)
     410             : {
     411        5551 :         assert(ov->ts >= TRANSACTION_ID_BASE);
     412             : 
     413             :         bte state = os_atmc_get_state(ov);
     414        5551 :         if (state & rollbacked) {
     415             :                 return;
     416             :         }
     417             : 
     418        5137 :         state |= rollbacked;
     419             :         os_atmc_set_state(ov, state);
     420             : 
     421             :         bte state_older;
     422             : 
     423             :         /*
     424             :          * We have to use the readers-writer lock here,
     425             :          * since the pointer containing the adress of the older objectversion might be concurrently overwritten if the older itself hass just been put in the under_destruction state .
     426             :          */
     427        5137 :         lock_reader(ov->os);
     428        5137 :         objectversion* name_based_older = ov->name_based_older;
     429        5137 :         unlock_reader(ov->os);
     430             : 
     431        5137 :         if (name_based_older && !((state_older= os_atmc_get_state(name_based_older)) & rollbacked)) {
     432         694 :                 if (ov->ts != name_based_older->ts) {
     433             :                         // older is last committed state or belongs to parent transaction.
     434             :                         // In any case, we restore versionhead pointer to that.
     435             : 
     436             :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     437         285 :                         if (state_older == active || (state_older == deleted && ATOMIC_CAS(&name_based_older->state, &expected_deleted, block_destruction))) {
     438         285 :                                 ov->name_based_head->ov = name_based_older;
     439         285 :                                 name_based_older->name_based_newer=NULL;
     440         285 :                                 if (state_older != active && expected_deleted == deleted)
     441             :                                         os_atmc_set_state(name_based_older, deleted); //Restore the deleted older back to its deleted state.
     442             :                         }
     443             :                 }
     444             :                 else {
     445         409 :                         _os_rollback(name_based_older, store);
     446             :                 }
     447             :         }
     448        4443 :         else if (!name_based_older) {
     449             :                 // this is a terminal node. i.e. this objectversion does not have name based committed history
     450        4443 :                 if (ov->name_based_head) // The oposite can happen during an early conflict in os_add or os_del.
     451        4437 :                         os_remove_name_based_chain(ov->os, ov);
     452             :         }
     453             : 
     454             :         /*
     455             :          * We have to use the readers-writer lock here,
     456             :          * since the pointer containing the adress of the older objectversion might be concurrently overwritten if the older itself hass just been put in the under_destruction state .
     457             :          */
     458        5137 :         lock_reader(ov->os);
     459        5137 :         objectversion* id_based_older = ov->id_based_older;
     460        5137 :         unlock_reader(ov->os);
     461        5137 :         if (id_based_older && !((state_older= os_atmc_get_state(id_based_older)) & rollbacked)) {
     462         290 :                 if (ov->ts != id_based_older->ts) {
     463             :                         // older is last committed state or belongs to parent transaction.
     464             :                         // In any case, we restore versionhead pointer to that.
     465             : 
     466             :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     467         285 :                         if (state_older == active || (state_older == deleted && ATOMIC_CAS(&id_based_older->state, &expected_deleted, block_destruction))) {
     468         285 :                                 ov->id_based_head->ov = id_based_older;
     469         285 :                                 id_based_older->id_based_newer=NULL;
     470         285 :                                 if (state_older != active && expected_deleted == deleted)
     471             :                                         os_atmc_set_state(id_based_older, deleted); //Restore the deleted older back to its deleted state.
     472             :                         }
     473             :                 }
     474           5 :                 else if (id_based_older != name_based_older)
     475           5 :                         _os_rollback(id_based_older, store);
     476             :         }
     477        4847 :         else if (!id_based_older) {
     478             :                 // this is a terminal node. i.e. this objectversion does not have id based committed history
     479        4458 :                 os_remove_id_based_chain(ov->os, ov);
     480             :         }
     481             : 
     482        5137 :         if (ov->name_based_newer && !(os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
     483           0 :                 _os_rollback(ov->name_based_newer, store);
     484             :         }
     485             : 
     486        5137 :         if (ov->id_based_newer && ov->id_based_newer != ov->name_based_newer && !(os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
     487           0 :                 _os_rollback(ov->id_based_newer, store);
     488             :         }
     489             : }
     490             : 
     491             : static int
     492             : os_rollback(objectversion *ov, sqlstore *store)
     493             : {
     494        5137 :         _os_rollback(ov, store);
     495             : 
     496        5137 :         return LOG_OK;
     497             : }
     498             : 
     499             : static inline void
     500       10182 : try_to_mark_deleted_for_destruction(sqlstore* store, objectversion *ov)
     501             : {
     502             :         ATOMIC_BASE_TYPE expected_deleted = deleted;
     503       10182 :         if (ATOMIC_CAS(&ov->state, &expected_deleted, under_destruction)) {
     504             : 
     505       10182 :                 if (!ov->name_based_newer || (os_atmc_get_state(ov->name_based_newer) & rollbacked)) {
     506        9697 :                         os_remove_name_based_chain(ov->os, ov);
     507             :                 }
     508             :                 else {
     509         485 :                         lock_writer(ov->os);
     510         485 :                         ov->name_based_newer->name_based_older = NULL;
     511         485 :                         unlock_writer(ov->os);
     512             :                 }
     513             : 
     514       10182 :                 if (!ov->id_based_newer || (os_atmc_get_state(ov->id_based_newer) & rollbacked)) {
     515       10165 :                         os_remove_id_based_chain(ov->os, ov);
     516             :                 }
     517             :                 else {
     518          17 :                         lock_writer(ov->os);
     519          17 :                         ov->id_based_newer->id_based_older = NULL;
     520          17 :                         unlock_writer(ov->os);
     521             :                 }
     522             : 
     523       10182 :                 ov->ts = store_get_timestamp(store)+1;
     524             :         }
     525       10182 : }
     526             : 
     527             : static void
     528       27254 : objectversion_destroy_recursive(sqlstore* store, objectversion *ov)
     529             : {
     530       27254 :         if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
     531       13523 :                 objectversion_destroy_recursive(store, ov->id_based_older);
     532             :         }
     533       27254 :         objectversion_destroy(store, ov->os, ov);
     534       27254 : }
     535             : 
     536             : static int
     537      230419 : os_cleanup(sqlstore* store, objectversion *ov, ulng oldest)
     538             : {
     539      230419 :         if (os_atmc_get_state(ov) & under_destruction) {
     540       10201 :                 if (ov->ts < oldest) {
     541             :                         // This one is ready to be freed
     542       10182 :                         objectversion_destroy_recursive(store, ov);
     543       10182 :                         return LOG_ERR;
     544             :                 }
     545             : 
     546             :                 // not yet old enough to be safely removed. Try later.
     547             :                 return LOG_OK;
     548             :         }
     549             : 
     550      220218 :         if (os_atmc_get_state(ov) & rollbacked) {
     551       10495 :                 if (ov->ts < oldest) {
     552             :                         // This one is ready to be freed
     553        5137 :                         if (ov->name_based_older && ov->name_based_older->name_based_newer == ov)
     554         409 :                                 ov->name_based_older->name_based_newer=NULL;
     555        5137 :                         if (ov->id_based_older && ov->id_based_older->id_based_newer == ov)
     556         394 :                                 ov->id_based_older->id_based_newer=NULL;
     557        5137 :                         objectversion_destroy(store, ov->os, ov);
     558        5137 :                         return LOG_ERR;
     559             :                 }
     560             : 
     561        5358 :                 if (ov->ts > TRANSACTION_ID_BASE) {
     562             :                         /* We mark it with the latest possible starttime and reinsert it into the cleanup list.
     563             :                          * This will cause a safe eventual destruction of this rollbacked ov.
     564             :                          */
     565        5137 :                         ov->ts = store_get_timestamp(store)+1;
     566             :                 }
     567             : 
     568             :                 // not yet old enough to be safely removed. Try later.
     569        5358 :                 return LOG_OK;
     570             :         }
     571             : 
     572      209723 :         if (os_atmc_get_state(ov) == deleted) {
     573       10225 :                 if (ov->ts <= oldest) {
     574             :                         // the oldest relevant state is deleted so lets try to mark it as destroyed
     575       10182 :                         try_to_mark_deleted_for_destruction(store, ov);
     576             :                 }
     577             : 
     578             :                 // Keep it inplace on the cleanup list, either because it is now marked for destruction or
     579             :                 // we want to retry marking it for destruction later.
     580       10225 :                 return LOG_OK;
     581             :         }
     582             : 
     583      207066 :         while (ov->id_based_older && ov->id_based_older == ov->name_based_older && ov->ts >= oldest) {
     584             :                 ov = ov->id_based_older;
     585             :         }
     586             : 
     587      199498 :         if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
     588             :                 // Destroy everything older then the oldest possibly relevant objectversion.
     589        3549 :                 objectversion_destroy_recursive(store, ov->id_based_older);
     590        3549 :                 ov->id_based_older = NULL;
     591             :         }
     592             : 
     593             :         return LOG_ERR;
     594             : }
     595             : 
     596             : static int
     597      230426 : tc_gc_objectversion(sql_store store, sql_change *change, ulng oldest)
     598             : {
     599      230426 :         assert(!change->handled);
     600      230426 :         objectversion *ov = (objectversion*)change->data;
     601             : 
     602      230426 :         if (oldest && oldest >= TRANSACTION_ID_BASE)
     603             :                 return 0;
     604      230419 :         int res = os_cleanup( (sqlstore*) store, ov, oldest);
     605      230419 :         change->handled = (res)?true:false;
     606      230419 :         return res;
     607             : }
     608             : 
     609             : static int
     610      214824 : tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     611             : {
     612      214824 :         objectversion *ov = (objectversion*)change->data;
     613      214824 :         if (commit_ts) {
     614      209687 :                 assert(ov->ts == tr->tid);
     615      209687 :                 ov->ts = commit_ts;
     616      209687 :                 change->committed = commit_ts < TRANSACTION_ID_BASE ? true: false;
     617             :                 (void)oldest;
     618      209687 :                 if (!tr->parent)
     619      209680 :                         change->obj->new = 0;
     620             :         }
     621             :         else {
     622        5137 :                 os_rollback(ov, tr->store);
     623             :         }
     624             : 
     625      214824 :         return LOG_OK;
     626             : }
     627             : 
     628             : objectset *
     629       18996 : os_new(sql_allocator *sa, destroy_fptr destroy, bool temporary, bool unique, bool concurrent, sql_store store)
     630             : {
     631       18996 :         objectset *os = SA_NEW(sa, objectset);
     632       18996 :         *os = (objectset) {
     633             :                 .refcnt = 1,
     634             :                 .sa = sa,
     635             :                 .destroy = destroy,
     636             :                 .temporary = temporary,
     637             :                 .unique = unique,
     638             :                 .concurrent = concurrent,
     639             :                 .store = store
     640             :         };
     641             :         os->destroy = destroy;
     642       18996 :         MT_rwlock_init(&os->rw_lock, "sa_readers_lock");
     643             : 
     644       18996 :         return os;
     645             : }
     646             : 
     647             : objectset *
     648          14 : os_dup(objectset *os)
     649             : {
     650          14 :         os->refcnt++;
     651          14 :         return os;
     652             : }
     653             : 
     654             : void
     655       18944 : os_destroy(objectset *os, sql_store store)
     656             : {
     657       18944 :         if (--os->refcnt > 0)
     658             :                 return;
     659       18930 :         MT_rwlock_destroy(&os->rw_lock);
     660       18930 :         versionhead* n=os->id_based_h;
     661      200157 :         while(n) {
     662      181227 :                 objectversion *ov = n->ov;
     663      363132 :                 while(ov) {
     664      181905 :                         objectversion *older = ov->id_based_older;
     665      181905 :                         objectversion_destroy(store, os, ov);
     666             :                         ov = older;
     667             :                 }
     668      181227 :                 versionhead* hn =n->next;
     669      181227 :                 node_destroy(os, store, n);
     670             :                 n = hn;
     671             :         }
     672             : 
     673       18930 :         n=os->name_based_h;
     674      200157 :         while(n) {
     675      181227 :                 versionhead* hn =n->next;
     676      181227 :                 node_destroy(os, store, n);
     677             :                 n = hn;
     678             :         }
     679             : 
     680       18930 :         if (os->id_map)
     681        3046 :                 hash_destroy(os->id_map);
     682             : 
     683       18930 :         if (os->name_map)
     684        3029 :                 hash_destroy(os->name_map);
     685             : 
     686       18930 :         if (!os->sa)
     687       18930 :                 _DELETE(os);
     688             : }
     689             : 
     690             : static versionhead  *
     691    16326447 : find_name(objectset *os, const char *name)
     692             : {
     693             :         lock_reader(os);
     694    16330388 :         if (os->name_map) {
     695    14423419 :                 int key = hash_key(name);
     696    14423419 :                 sql_hash_e *he = os->name_map->buckets[key&(os->name_map->size-1)];
     697             : 
     698    95394487 :                 for (; he; he = he->chain) {
     699    95173667 :                         versionhead  *n = he->value;
     700             : 
     701    95173667 :                         if (n && n->ov->b->name && strcmp(n->ov->b->name, name) == 0) {
     702             :                                 unlock_reader(os);
     703    14199970 :                                 return n;
     704             :                         }
     705             :                 }
     706             :                 unlock_reader(os);
     707      220820 :                 return NULL;
     708             :         }
     709             : 
     710     2213804 :         for (versionhead  *n = os->name_based_h; n; n = n->next) {
     711     2161165 :                 objectversion *ov = n->ov;
     712             : 
     713             :                 /* check if names match */
     714     2161165 :                 if (name[0] == ov->b->name[0] && strcmp(name, ov->b->name) == 0) {
     715             :                         unlock_reader(os);
     716     1854334 :                         return n;
     717             :                 }
     718             :         }
     719             : 
     720             :         unlock_reader(os);
     721       52639 :         return NULL;
     722             : }
     723             : 
     724             : static objectversion*
     725    17125384 : get_valid_object_name(sql_trans *tr, objectversion *ov)
     726             : {
     727    17125227 :         while(ov) {
     728    17125201 :                 if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts)) || ov->ts < tr->ts)
     729    17124899 :                         return ov;
     730             :                 else {
     731         302 :                         lock_reader(ov->os);
     732         117 :                         objectversion* name_based_older = ov->name_based_older;
     733         117 :                         unlock_reader(ov->os);
     734             :                         ov = name_based_older;
     735             :                 }
     736             :         }
     737             :         return ov;
     738             : }
     739             : 
     740             : static objectversion*
     741      426491 : get_valid_object_id(sql_trans *tr, objectversion *ov)
     742             : {
     743      426580 :         while(ov) {
     744      426527 :                 if (ov->ts == tr->tid || (tr->parent && tr_version_of_parent(tr, ov->ts))  || ov->ts < tr->ts)
     745      426438 :                         return ov;
     746             :                 else {
     747          89 :                         lock_reader(ov->os);
     748          89 :                         objectversion* id_based_older = ov->id_based_older;
     749          89 :                         unlock_reader(ov->os);
     750             :                         ov = id_based_older;
     751             :                 }
     752             :         }
     753             :         return ov;
     754             : }
     755             : 
     756             : static int
     757      204219 : os_add_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
     758             :         versionhead  *name_based_node = NULL;
     759      204219 :         if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
     760        7826 :                 name_based_node = ov->id_based_older->name_based_head;
     761      196393 :         else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
     762       59721 :                 name_based_node = find_name(os, name);
     763             :         // else names are not unique and each id based version head maps to its own name based version head.
     764             : 
     765       67547 :         if (name_based_node) {
     766        8337 :                 objectversion *co = name_based_node->ov;
     767        8337 :                 objectversion *oo = get_valid_object_name(tr, co);
     768        8337 :                 if (co != oo) { /* conflict ? */
     769             :                         return -3;
     770             :                 }
     771             : 
     772        8331 :                 assert(ov != oo); // Time loops are not allowed
     773             : 
     774             :                 bte state = os_atmc_get_state(oo);
     775        8331 :                 if (state != active) {
     776             :                         // This can only happen if the parent oo was a comitted deleted at some point.
     777         506 :                         assert(state == deleted || state == under_destruction || state == block_destruction);
     778             :                         /* Since our parent oo is comitted deleted objectversion, we might have a conflict with
     779             :                         * another transaction that tries to clean up oo or also wants to add a new objectversion.
     780             :                         */
     781             :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     782         506 :                         if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
     783             :                                 return -3; /*conflict with cleaner or write-write conflict*/
     784             :                         }
     785             :                 }
     786             : 
     787             :                 /* new object with same name within transaction, should have a delete in between */
     788        8331 :                 assert(!(state == active && oo->ts == ov->ts && !(os_atmc_get_state(ov) & deleted)));
     789             : 
     790             :                 lock_writer(os);
     791        8331 :                 ov->name_based_head = oo->name_based_head;
     792        8331 :                 ov->name_based_older = oo;
     793             : 
     794        8331 :                 name_based_node->ov = ov;
     795             :                 if (oo) {
     796        8331 :                         oo->name_based_newer = ov;
     797             :                         // if the parent was originally deleted, we restore it to that state.
     798             :                         os_atmc_set_state(oo, state);
     799             :                 }
     800             :                 unlock_writer(os);
     801        8331 :                 return 0;
     802             :         } else { /* new */
     803      195882 :                 if (os_append_name(os, ov) == NULL)
     804             :                         return -1; // MALLOC_FAIL
     805      195882 :                 return 0;
     806             :         }
     807             : }
     808             : 
     809             : static int
     810      204227 : os_add_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
     811             :         versionhead  *id_based_node;
     812             : 
     813      204227 :         id_based_node = find_id(os, id);
     814             : 
     815      204227 :         if (id_based_node) {
     816        7856 :                 objectversion *co = id_based_node->ov;
     817        7856 :                 objectversion *oo = get_valid_object_id(tr, co);
     818        7856 :                 if (co != oo) { /* conflict ? */
     819             :                         return -3;
     820             :                 }
     821             : 
     822        7848 :                 assert(ov != oo); // Time loops are not allowed
     823             : 
     824             :                 bte state = os_atmc_get_state(oo);
     825        7848 :                 if (state != active) {
     826             :                         // This can only happen if the parent oo was a comitted deleted at some point.
     827          23 :                         assert(state == deleted || state == under_destruction || state == block_destruction);
     828             :                         /* Since our parent oo is comitted deleted objectversion, we might have a conflict with
     829             :                         * another transaction that tries to clean up oo or also wants to add a new objectversion.
     830             :                         */
     831             :                         ATOMIC_BASE_TYPE expected_deleted = deleted;
     832          23 :                         if (!ATOMIC_CAS(&oo->state, &expected_deleted, block_destruction)) {
     833             :                                 return -3; /*conflict with cleaner or write-write conflict*/
     834             :                         }
     835             :                 }
     836             : 
     837             :                 lock_writer(os);
     838        7848 :                 ov->id_based_head = oo->id_based_head;
     839        7848 :                 ov->id_based_older = oo;
     840             : 
     841        7848 :                 id_based_node->ov = ov;
     842             :                 if (oo) {
     843        7848 :                         oo->id_based_newer = ov;
     844             :                         // if the parent was originally deleted, we restore it to that state.
     845             :                         os_atmc_set_state(oo, state);
     846             :                 }
     847             :                 unlock_writer(os);
     848        7848 :                 return 0;
     849             :         } else { /* new */
     850      196371 :                 if (os_append_id(os, ov) == NULL)
     851             :                         return -1; // MALLOC_FAIL
     852             : 
     853      196371 :                 return 0;
     854             :         }
     855             : }
     856             : 
     857             : static int /*ok, error (name existed) and conflict (added before) */
     858      204228 : os_add_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
     859             : {
     860             :         int res = 0;
     861      204228 :         objectversion *ov = SA_NEW(os->sa, objectversion);
     862             : 
     863      204228 :         *ov = (objectversion) {
     864      204228 :                 .ts = tr->tid,
     865             :                 .b = b,
     866             :                 .os = os,
     867             :         };
     868             : 
     869      204228 :         if (!os->concurrent && os_has_changes(os, tr)) { /* for object sets without concurrent support, conflict if concurrent changes are there */
     870           1 :                 if (os->destroy)
     871           1 :                         os->destroy(os->store, ov->b);
     872           1 :                 _DELETE(ov);
     873           1 :                 return -3; /* conflict */
     874             :         }
     875             : 
     876      204227 :         if ((res = os_add_id_based(os, tr, b->id, ov))) {
     877           8 :                 if (os->destroy)
     878           8 :                         os->destroy(os->store, ov->b);
     879           8 :                 _DELETE(ov);
     880           8 :                 return res;
     881             :         }
     882             : 
     883      204219 :         if ((res = os_add_name_based(os, tr, name, ov))) {
     884           6 :                 trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
     885           6 :                 return res;
     886             :         }
     887             : 
     888      204213 :         if (!os->temporary)
     889      204213 :                 trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
     890             :         return res;
     891             : }
     892             : 
     893             : int
     894      204228 : os_add(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
     895             : {
     896      204228 :         store_lock(tr->store);
     897      204228 :         int res = os_add_(os, tr, name, b);
     898      204228 :         store_unlock(tr->store);
     899      204228 :         return res;
     900             : }
     901             : 
     902             : static int
     903       10598 : os_del_name_based(objectset *os, struct sql_trans *tr, const char *name, objectversion *ov) {
     904             :         versionhead  *name_based_node = NULL;
     905       10598 :         if (ov->id_based_older && strcmp(ov->id_based_older->b->name, name) == 0)
     906       10598 :                 name_based_node = ov->id_based_older->name_based_head;
     907           0 :         else if (os->unique) // Previous name based objectversion is of a different id, so now we do have to perform an extensive look up
     908           0 :                 name_based_node = find_name(os, name);
     909             : 
     910       10598 :         if (name_based_node) {
     911       10598 :                 objectversion *co = name_based_node->ov;
     912       10598 :                 objectversion *oo = get_valid_object_name(tr, co);
     913       10598 :                 ov->name_based_head = oo->name_based_head;
     914       10598 :                 if (co != oo) { /* conflict ? */
     915             :                         return -3;
     916             :                 }
     917       10598 :                 ov->name_based_older = oo;
     918             : 
     919             :                 lock_writer(os);
     920             :                 if (oo) {
     921       10598 :                         oo->name_based_newer = ov;
     922       10598 :                         assert(os_atmc_get_state(oo) == active);
     923             :                 }
     924       10598 :                 name_based_node->ov = ov;
     925             :                 unlock_writer(os);
     926       10598 :                 return 0;
     927             :         } else {
     928             :                 /* missing */
     929           0 :                 return -1;
     930             :         }
     931             : }
     932             : 
     933             : static int
     934       10600 : os_del_id_based(objectset *os, struct sql_trans *tr, sqlid id, objectversion *ov) {
     935             :         versionhead  *id_based_node;
     936       10600 :         if (ov->name_based_older && ov->name_based_older->b->id == id)
     937           0 :                 id_based_node = ov->name_based_older->id_based_head;
     938             :         else // Previous id based objectversion is of a different name, so now we do have to perform an extensive look up
     939       10600 :                 id_based_node = find_id(os, id);
     940             : 
     941       10600 :         if (id_based_node) {
     942       10600 :                 objectversion *co = id_based_node->ov;
     943       10600 :                 objectversion *oo = get_valid_object_id(tr, co);
     944       10600 :                 ov->id_based_head = oo->id_based_head;
     945       10600 :                 if (co != oo) { /* conflict ? */
     946             :                         return -3;
     947             :                 }
     948       10598 :                 ov->id_based_older = oo;
     949             : 
     950             :                 lock_writer(os);
     951             :                 if (oo) {
     952       10598 :                         oo->id_based_newer = ov;
     953       10598 :                         assert(os_atmc_get_state(oo) == active);
     954             :                 }
     955       10598 :                 id_based_node->ov = ov;
     956             :                 unlock_writer(os);
     957       10598 :                 return 0;
     958             :         } else {
     959             :                 /* missing */
     960             :                 return -1;
     961             :         }
     962             : }
     963             : 
     964             : static int
     965       10600 : os_del_(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
     966             : {
     967             :         int res = 0;
     968       10600 :         objectversion *ov = SA_NEW(os->sa, objectversion);
     969             : 
     970       10600 :         *ov = (objectversion) {
     971       10600 :                 .ts = tr->tid,
     972             :                 .b = b,
     973             :                 .os = os,
     974             :         };
     975             :         os_atmc_set_state(ov, deleted);
     976             : 
     977       10600 :         if ((res = os_del_id_based(os, tr, b->id, ov))) {
     978           2 :                 if (os->destroy)
     979           2 :                         os->destroy(os->store, ov->b);
     980           2 :                 _DELETE(ov);
     981           2 :                 return res;
     982             :         }
     983             : 
     984       10598 :         if ((res = os_del_name_based(os, tr, name, ov))) {
     985           0 :                 trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
     986           0 :                 return res;
     987             :         }
     988             : 
     989       10598 :         if (!os->temporary)
     990       10598 :                 trans_add(tr, b, ov, &tc_gc_objectversion, &tc_commit_objectversion, NULL);
     991             :         return res;
     992             : }
     993             : 
     994             : int
     995       10600 : os_del(objectset *os, struct sql_trans *tr, const char *name, sql_base *b)
     996             : {
     997       10600 :         store_lock(tr->store);
     998       10600 :         int res = os_del_(os, tr, name, b);
     999       10600 :         store_unlock(tr->store);
    1000       10600 :         return res;
    1001             : }
    1002             : 
    1003             : int
    1004         332 : os_size(objectset *os, struct sql_trans *tr)
    1005             : {
    1006             :         int cnt = 0;
    1007         332 :         if (os) {
    1008             :                 lock_reader(os);
    1009         342 :                 for(versionhead  *n = os->name_based_h; n; n=n->next) {
    1010          10 :                         objectversion *ov = n->ov;
    1011          10 :                         if ((ov=get_valid_object_name(tr, ov)) && os_atmc_get_state(ov) == active)
    1012           7 :                                 cnt++;
    1013             :                 }
    1014             :                 unlock_reader(os);
    1015             :         }
    1016         332 :         return cnt;
    1017             : }
    1018             : 
    1019             : int
    1020           0 : os_empty(objectset *os, struct sql_trans *tr)
    1021             : {
    1022           0 :         return os_size(os, tr)==0;
    1023             : }
    1024             : 
    1025             : int
    1026           0 : os_remove(objectset *os, sql_trans *tr, const char *name)
    1027             : {
    1028             :         (void) os;
    1029             :         (void) tr;
    1030             :         (void) name;
    1031             :         // TODO remove entire versionhead  corresponding to this name.
    1032             : 
    1033             :         // TODO assert os->unique?s
    1034           0 :         return LOG_OK;
    1035             : }
    1036             : 
    1037             : sql_base *
    1038    16271921 : os_find_name(objectset *os, struct sql_trans *tr, const char *name)
    1039             : {
    1040    16271921 :         if (!os)
    1041             :                 return NULL;
    1042             : 
    1043    16271919 :         assert(os->unique);
    1044    16271919 :         versionhead  *n = find_name(os, name);
    1045             : 
    1046    16264381 :         if (n) {
    1047    16050132 :                  objectversion *ov = get_valid_object_name(tr, n->ov);
    1048    16047594 :                  if (ov && os_atmc_get_state(ov) == active)
    1049    16046625 :                          return ov->b;
    1050             :         }
    1051             :         return NULL;
    1052             : }
    1053             : 
    1054             : sql_base *
    1055       82995 : os_find_id(objectset *os, struct sql_trans *tr, sqlid id)
    1056             : {
    1057       82995 :         if (!os)
    1058             :                 return NULL;
    1059       82995 :         versionhead  *n = find_id(os, id);
    1060             : 
    1061       82995 :         if (n) {
    1062       82879 :                  objectversion *ov = get_valid_object_id(tr, n->ov);
    1063       82879 :                  if (ov && os_atmc_get_state(ov) == active)
    1064       82875 :                          return ov->b;
    1065             :         }
    1066             :         return NULL;
    1067             : }
    1068             : 
    1069             : void
    1070     3444037 : os_iterator(struct os_iter *oi, struct objectset *os, struct sql_trans *tr, const char *name /*optional*/)
    1071             : {
    1072     3444037 :         *oi = (struct os_iter) {
    1073             :                 .os = os,
    1074             :                 .tr = tr,
    1075             :                 .name = name,
    1076             :         };
    1077             : 
    1078             :         lock_reader(os);
    1079     3444037 :         oi->n =      os->name_based_h;
    1080             :         unlock_reader(os);
    1081     3444037 : }
    1082             : 
    1083             : sql_base *
    1084     4751636 : oi_next(struct os_iter *oi)
    1085             : {
    1086             :         sql_base *b = NULL;
    1087             : 
    1088     4751636 :         if (oi->name) {
    1089     2423572 :                 versionhead  *n = oi->n;
    1090             : 
    1091   336908135 :                 while (n && !b) {
    1092             : 
    1093   334484563 :                         if (n->ov->b->name && strcmp(n->ov->b->name, oi->name) == 0) {
    1094             :                                 objectversion *ov = n->ov;
    1095             : 
    1096     1058345 :                                 n = oi->n = n->next;
    1097     1058345 :                                 ov = get_valid_object_name(oi->tr, ov);
    1098     1058345 :                                 if (ov && os_atmc_get_state(ov) == active)
    1099     1055716 :                                         b = ov->b;
    1100             :                         } else {
    1101   333426218 :                                 lock_reader(oi->os);
    1102   333426145 :                                 n = oi->n = n->next;
    1103   333426145 :                                 unlock_reader(oi->os);
    1104             :                         }
    1105             :                 }
    1106             :         } else {
    1107     2328064 :                 versionhead  *n = oi->n;
    1108             : 
    1109     2653220 :                 while (n && !b) {
    1110      325156 :                         objectversion *ov = n->ov;
    1111      325156 :                         lock_reader(oi->os);
    1112      325156 :                         n = oi->n = n->next;
    1113      325156 :                         unlock_reader(oi->os);
    1114             : 
    1115      325156 :                         ov = get_valid_object_id(oi->tr, ov);
    1116      325156 :                         if (ov && os_atmc_get_state(ov) == active)
    1117      301624 :                                 b = ov->b;
    1118             :                 }
    1119             :         }
    1120     4751636 :         return b;
    1121             : }
    1122             : 
    1123             : bool
    1124        3475 : os_obj_intransaction(objectset *os, struct sql_trans *tr, sql_base *b)
    1125             : {
    1126        3475 :         versionhead  *n = find_id(os, b->id);
    1127             : 
    1128        3475 :         if (n) {
    1129        3475 :                 objectversion *ov = n->ov;
    1130             : 
    1131        3475 :                 if (ov && os_atmc_get_state(ov) == active && ov->ts == tr->tid)
    1132        1402 :                         return true;
    1133             :         }
    1134             :         return false;
    1135             : }
    1136             : 
    1137             : /* return true if this object set has changes pending for an other transaction */
    1138             : bool
    1139      128779 : os_has_changes(objectset *os, struct sql_trans *tr)
    1140             : {
    1141      128779 :         versionhead  *n = os->id_based_t;
    1142             : 
    1143      128779 :         if (n) {
    1144      127141 :                 objectversion *ov = n->ov;
    1145             : 
    1146      127141 :                 if (ov && os_atmc_get_state(ov) == active && ov->ts != tr->tid && ov->ts > TRANSACTION_ID_BASE)
    1147           1 :                         return true;
    1148             :         }
    1149             :         return false;
    1150             : }

Generated by: LCOV version 1.14