LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - tablet.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 708 982 72.1 %
Date: 2021-10-27 03:06:47 Functions: 27 31 87.1 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  *  Niels Nes, Martin Kersten
      11             :  *
      12             :  * Parallel bulk load for SQL
      13             :  * The COPY INTO command for SQL is heavily CPU bound, which means
      14             :  * that ideally we would like to exploit the multi-cores to do that
      15             :  * work in parallel.
      16             :  * Complicating factors are the initial record offset, the
      17             :  * possible variable length of the input, and the original sort order
      18             :  * that should preferable be maintained.
      19             :  *
      20             :  * The code below consists of a file reader, which breaks up the
      21             :  * file into chunks of distinct rows. Then multiple parallel threads
      22             :  * grab them, and break them on the field boundaries.
      23             :  * After all fields are identified this way, the columns are converted
      24             :  * and stored in the BATs.
      25             :  *
      26             :  * The threads get a reference to a private copy of the READERtask.
      27             :  * It includes a list of columns they should handle. This is a basis
      28             :  * to distributed cheap and expensive columns over threads.
      29             :  *
      30             :  * The file reader overlaps IO with updates of the BAT.
      31             :  * Also the buffer size of the block stream might be a little small for
      32             :  * this task (1MB). It has been increased to 8MB, which indeed improved.
      33             :  *
      34             :  * The work divider allocates subtasks to threads based on the
      35             :  * observed time spending so far.
      36             :  */
      37             : 
      38             : #include "monetdb_config.h"
      39             : #include "tablet.h"
      40             : #include "mapi_prompt.h"
      41             : 
      42             : #include <string.h>
      43             : #include <ctype.h>
      44             : 
      45             : #define MAXWORKERS      64
      46             : #define MAXBUFFERS 2
      47             : /* We restrict the row length to be 32MB for the time being */
      48             : #define MAXROWSIZE(X) (X > 32*1024*1024 ? X : 32*1024*1024)
      49             : 
      50             : static MT_Lock errorlock = MT_LOCK_INITIALIZER(errorlock);
      51             : 
      52             : static BAT *
      53        9358 : void_bat_create(int adt, BUN nr)
      54             : {
      55        9358 :         BAT *b = COLnew(0, adt, nr, TRANSIENT);
      56             : 
      57             :         /* check for correct structures */
      58        9358 :         if (b == NULL)
      59             :                 return NULL;
      60        9358 :         if ((b = BATsetaccess(b, BAT_APPEND)) == NULL) {
      61             :                 return NULL;
      62             :         }
      63             : 
      64             :         /* disable all properties here */
      65        9358 :         b->tsorted = false;
      66        9358 :         b->trevsorted = false;
      67        9358 :         b->tnosorted = 0;
      68        9358 :         b->tnorevsorted = 0;
      69        9358 :         b->tseqbase = oid_nil;
      70        9358 :         b->tkey = false;
      71        9358 :         b->tnokey[0] = 0;
      72        9358 :         b->tnokey[1] = 0;
      73        9358 :         return b;
      74             : }
      75             : 
      76             : void
      77       75217 : TABLETdestroy_format(Tablet *as)
      78             : {
      79             :         BUN p;
      80       75217 :         Column *fmt = as->format;
      81             : 
      82      391892 :         for (p = 0; p < as->nr_attrs; p++) {
      83      316675 :                 if (fmt[p].c)
      84      242257 :                         BBPunfix(fmt[p].c->batCacheid);
      85      316675 :                 if (fmt[p].data)
      86        9363 :                         GDKfree(fmt[p].data);
      87             :         }
      88       75217 :         GDKfree(fmt);
      89       75217 : }
      90             : 
      91             : static oid
      92       74412 : check_BATs(Tablet *as)
      93             : {
      94       74412 :         Column *fmt = as->format;
      95             :         BUN i = 0;
      96             :         BUN cnt;
      97             :         oid base;
      98             : 
      99       74412 :         if (fmt[i].c == NULL)
     100             :                 i++;
     101       74412 :         cnt = BATcount(fmt[i].c);
     102       74412 :         base = fmt[i].c->hseqbase;
     103             : 
     104       74412 :         if (as->nr != cnt)
     105        1198 :                 return oid_nil;
     106             : 
     107      372834 :         for (i = 0; i < as->nr_attrs; i++) {
     108             :                 BAT *b;
     109             :                 BUN offset;
     110             : 
     111      299621 :                 b = fmt[i].c;
     112      299621 :                 if (b == NULL)
     113       73214 :                         continue;
     114      226407 :                 offset = as->offset;
     115             : 
     116      226407 :                 if (BATcount(b) != cnt || b->hseqbase != base)
     117           1 :                         return oid_nil;
     118             : 
     119      226406 :                 fmt[i].p = offset;
     120             :         }
     121             :         return base;
     122             : }
     123             : 
     124             : str
     125         805 : TABLETcreate_bats(Tablet *as, BUN est)
     126             : {
     127         805 :         Column *fmt = as->format;
     128             :         BUN i, nr = 0;
     129             : 
     130       10169 :         for (i = 0; i < as->nr_attrs; i++) {
     131        9364 :                 if (fmt[i].skip)
     132           6 :                         continue;
     133        9358 :                 fmt[i].c = void_bat_create(fmt[i].adt, est);
     134        9358 :                 if (!fmt[i].c) {
     135           0 :                         while (i > 0) {
     136           0 :                                 if (!fmt[--i].skip)
     137           0 :                                         BBPreclaim(fmt[i].c);
     138             :                         }
     139           0 :                         throw(SQL, "copy", "Failed to create bat of size " BUNFMT "\n", as->nr);
     140             :                 }
     141        9358 :                 fmt[i].ci = bat_iterator_nolock(fmt[i].c);
     142        9358 :                 nr++;
     143             :         }
     144         805 :         if (!nr)
     145           0 :                 throw(SQL, "copy", "At least one column should be read from the input\n");
     146             :         return MAL_SUCCEED;
     147             : }
     148             : 
     149             : str
     150         780 : TABLETcollect(BAT **bats, Tablet *as)
     151             : {
     152         780 :         Column *fmt = as->format;
     153             :         BUN i, j;
     154             :         BUN cnt = 0;
     155             : 
     156         780 :         if (bats == NULL)
     157           0 :                 throw(SQL, "copy", "Missing container");
     158        1964 :         for (i = 0; i < as->nr_attrs && !cnt; i++)
     159        1184 :                 if (!fmt[i].skip)
     160        1181 :                         cnt = BATcount(fmt[i].c);
     161       10051 :         for (i = 0, j = 0; i < as->nr_attrs; i++) {
     162        9271 :                 if (fmt[i].skip)
     163           6 :                         continue;
     164        9265 :                 bats[j] = fmt[i].c;
     165        9265 :                 BBPfix(bats[j]->batCacheid);
     166        9265 :                 if ((fmt[i].c = BATsetaccess(fmt[i].c, BAT_READ)) == NULL)
     167           0 :                         throw(SQL, "copy", "Failed to set access at tablet part " BUNFMT "\n", cnt);
     168        9265 :                 fmt[i].c->tsorted = fmt[i].c->trevsorted = false;
     169        9265 :                 fmt[i].c->tkey = false;
     170        9265 :                 BATsettrivprop(fmt[i].c);
     171             : 
     172        9265 :                 if (cnt != BATcount(fmt[i].c))
     173           0 :                         throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n", BATcount(fmt[i].c), cnt);
     174        9265 :                 j++;
     175             :         }
     176             :         return MAL_SUCCEED;
     177             : }
     178             : 
     179             : str
     180           0 : TABLETcollect_parts(BAT **bats, Tablet *as, BUN offset)
     181             : {
     182           0 :         Column *fmt = as->format;
     183             :         BUN i, j;
     184             :         BUN cnt = 0;
     185             : 
     186           0 :         for (i = 0; i < as->nr_attrs && !cnt; i++)
     187           0 :                 if (!fmt[i].skip)
     188           0 :                         cnt = BATcount(fmt[i].c);
     189           0 :         for (i = 0, j = 0; i < as->nr_attrs; i++) {
     190             :                 BAT *b, *bv = NULL;
     191           0 :                 if (fmt[i].skip)
     192           0 :                         continue;
     193           0 :                 b = fmt[i].c;
     194           0 :                 b->tsorted = b->trevsorted = false;
     195           0 :                 b->tkey = false;
     196           0 :                 BATsettrivprop(b);
     197           0 :                 if ((b = BATsetaccess(b, BAT_READ)) == NULL) {
     198           0 :                         fmt[i].c = NULL;
     199           0 :                         throw(SQL, "copy", "Failed to set access at tablet part " BUNFMT "\n", cnt);
     200             :                 }
     201           0 :                 bv = BATslice(b, (offset > 0) ? offset - 1 : 0, BATcount(b));
     202           0 :                 bats[j] = bv;
     203             : 
     204           0 :                 b->tkey = (offset > 0) ? FALSE : bv->tkey;
     205           0 :                 b->tnonil &= bv->tnonil;
     206           0 :                 if (b->tsorted != bv->tsorted)
     207           0 :                         b->tsorted = false;
     208           0 :                 if (b->trevsorted != bv->trevsorted)
     209           0 :                         b->trevsorted = false;
     210           0 :                 if (BATtdense(b))
     211           0 :                         b->tkey = true;
     212           0 :                 b->batDirtydesc = true;
     213             : 
     214           0 :                 if (offset > 0) {
     215           0 :                         BBPunfix(bv->batCacheid);
     216           0 :                         bats[j] = BATslice(b, offset, BATcount(b));
     217             :                 }
     218           0 :                 if (cnt != BATcount(b))
     219           0 :                         throw(SQL, "copy", "Count " BUNFMT " differs from " BUNFMT "\n", BATcount(b), cnt);
     220           0 :                 j++;
     221             :         }
     222             :         return MAL_SUCCEED;
     223             : }
     224             : 
     225             : // the starting quote character has already been skipped
     226             : 
     227             : static char *
     228     4610472 : tablet_skip_string(char *s, char quote, bool escape)
     229             : {
     230             :         size_t i = 0, j = 0;
     231    84223638 :         while (s[i]) {
     232    84176965 :                 if (escape && s[i] == '\\' && s[i + 1] != '\0')
     233      496838 :                         s[j++] = s[i++];
     234    83680127 :                 else if (s[i] == quote) {
     235     3045655 :                         if (s[i + 1] != quote)
     236             :                                 break;
     237             :                         i++;                            /* skip the first quote */
     238             :                 }
     239    79613166 :                 s[j++] = s[i++];
     240             :         }
     241     4610472 :         assert(s[i] == quote || s[i] == '\0');
     242     4610472 :         if (s[i] == 0)
     243             :                 return NULL;
     244     4610472 :         s[j] = 0;
     245     4610472 :         return s + i;
     246             : }
     247             : 
     248             : static int
     249           0 : TABLET_error(stream *s)
     250             : {
     251           0 :         char *err = mnstr_error(s);
     252             :         /* use free as stream allocates outside GDK */
     253           0 :         if (err)
     254           0 :                 free(err);
     255           0 :         return -1;
     256             : }
     257             : 
     258             : /* The output line is first built before being sent. It solves a problem
     259             :    with UDP, where you may loose most of the information using short writes
     260             : */
     261             : static inline int
     262      785363 : output_line(char **buf, size_t *len, char **localbuf, size_t *locallen, Column *fmt, stream *fd, BUN nr_attrs, oid id)
     263             : {
     264             :         BUN i;
     265             :         ssize_t fill = 0;
     266             : 
     267     6402532 :         for (i = 0; i < nr_attrs; i++) {
     268     5617169 :                 if (fmt[i].c == NULL)
     269      785363 :                         continue;
     270     4831806 :                 if (id < fmt[i].c->hseqbase || id >= fmt[i].c->hseqbase + BATcount(fmt[i].c))
     271             :                         break;
     272     4831806 :                 fmt[i].p = id - fmt[i].c->hseqbase;
     273             :         }
     274      785363 :         if (i == nr_attrs) {
     275     6402532 :                 for (i = 0; i < nr_attrs; i++) {
     276     5617169 :                         Column *f = fmt + i;
     277             :                         const char *p;
     278             :                         ssize_t l;
     279             : 
     280     5617169 :                         if (f->c) {
     281     4831806 :                                 p = BUNtail(f->ci, f->p);
     282             : 
     283     4831806 :                                 if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     284        1756 :                                         p = f->nullstr;
     285        1756 :                                         l = (ssize_t) strlen(f->nullstr);
     286             :                                 } else {
     287     4830050 :                                         l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
     288     4830050 :                                         if (l < 0)
     289             :                                                 return -1;
     290     4830050 :                                         p = *localbuf;
     291             :                                 }
     292     4831806 :                                 if (fill + l + f->seplen >= (ssize_t) *len) {
     293             :                                         /* extend the buffer */
     294             :                                         char *nbuf;
     295           0 :                                         nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
     296           0 :                                         if( nbuf == NULL)
     297             :                                                 return -1; /* *buf freed by caller */
     298           0 :                                         *buf = nbuf;
     299           0 :                                         *len = fill + l + f->seplen + BUFSIZ;
     300             :                                 }
     301     4831806 :                                 strncpy(*buf + fill, p, l);
     302             :                                 fill += l;
     303             :                         }
     304     5617169 :                         strncpy(*buf + fill, f->sep, f->seplen);
     305     5617169 :                         fill += f->seplen;
     306             :                 }
     307             :         }
     308      785363 :         if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
     309           0 :                 return TABLET_error(fd);
     310             :         return 0;
     311             : }
     312             : 
     313             : static inline int
     314     1397354 : output_line_dense(char **buf, size_t *len, char **localbuf, size_t *locallen, Column *fmt, stream *fd, BUN nr_attrs)
     315             : {
     316             :         BUN i;
     317             :         ssize_t fill = 0;
     318             : 
     319     8142531 :         for (i = 0; i < nr_attrs; i++) {
     320     6745177 :                 Column *f = fmt + i;
     321             :                 const char *p;
     322             :                 ssize_t l;
     323             : 
     324     6745177 :                 if (f->c) {
     325     5347823 :                         p = BUNtail(f->ci, f->p);
     326             : 
     327     5347823 :                         if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     328      799144 :                                 p = f->nullstr;
     329      799144 :                                 l = (ssize_t) strlen(p);
     330             :                         } else {
     331     4548679 :                                 l = f->tostr(f->extra, localbuf, locallen, f->adt, p);
     332     4548679 :                                 if (l < 0)
     333             :                                         return -1;
     334     4548679 :                                 p = *localbuf;
     335             :                         }
     336     5347823 :                         if (fill + l + f->seplen >= (ssize_t) *len) {
     337             :                                 /* extend the buffer */
     338             :                                 char *nbuf;
     339          73 :                                 nbuf = GDKrealloc(*buf, fill + l + f->seplen + BUFSIZ);
     340          73 :                                 if( nbuf == NULL)
     341             :                                         return -1;      /* *buf freed by caller */
     342          73 :                                 *buf = nbuf;
     343          73 :                                 *len = fill + l + f->seplen + BUFSIZ;
     344             :                         }
     345     5347823 :                         strncpy(*buf + fill, p, l);
     346             :                         fill += l;
     347     5347823 :                         f->p++;
     348             :                 }
     349     6745177 :                 strncpy(*buf + fill, f->sep, f->seplen);
     350     6745177 :                 fill += f->seplen;
     351             :         }
     352     1397354 :         if (fd && mnstr_write(fd, *buf, 1, fill) != fill)
     353           0 :                 return TABLET_error(fd);
     354             :         return 0;
     355             : }
     356             : 
     357             : static inline int
     358           0 : output_line_lookup(char **buf, size_t *len, Column *fmt, stream *fd, BUN nr_attrs, oid id)
     359             : {
     360             :         BUN i;
     361             : 
     362           0 :         for (i = 0; i < nr_attrs; i++) {
     363           0 :                 Column *f = fmt + i;
     364             : 
     365           0 :                 if (f->c) {
     366           0 :                         const void *p = BUNtail(f->ci, id - f->c->hseqbase);
     367             : 
     368           0 :                         if (!p || ATOMcmp(f->adt, ATOMnilptr(f->adt), p) == 0) {
     369           0 :                                 size_t l = strlen(f->nullstr);
     370           0 :                                 if (mnstr_write(fd, f->nullstr, 1, l) != (ssize_t) l)
     371           0 :                                         return TABLET_error(fd);
     372             :                         } else {
     373           0 :                                 ssize_t l = f->tostr(f->extra, buf, len, f->adt, p);
     374             : 
     375           0 :                                 if (l < 0 || mnstr_write(fd, *buf, 1, l) != l)
     376           0 :                                         return TABLET_error(fd);
     377             :                         }
     378             :                 }
     379           0 :                 if (mnstr_write(fd, f->sep, 1, f->seplen) != f->seplen)
     380           0 :                         return TABLET_error(fd);
     381             :         }
     382             :         return 0;
     383             : }
     384             : 
     385             : /* returns TRUE if there is/might be more */
     386             : static bool
     387      131681 : tablet_read_more(bstream *in, stream *out, size_t n)
     388             : {
     389      131681 :         if (out) {
     390             :                 do {
     391             :                         /* query is not finished ask for more */
     392             :                         /* we need more query text */
     393      130272 :                         if (bstream_next(in) < 0)
     394             :                                 return false;
     395      130272 :                         if (in->eof) {
     396      130270 :                                 if (mnstr_write(out, PROMPT2, sizeof(PROMPT2) - 1, 1) == 1)
     397      130270 :                                         mnstr_flush(out, MNSTR_FLUSH_DATA);
     398      130270 :                                 in->eof = false;
     399             :                                 /* we need more query text */
     400      130270 :                                 if (bstream_next(in) <= 0)
     401             :                                         return false;
     402             :                         }
     403      130270 :                 } while (in->len <= in->pos);
     404        1409 :         } else if (bstream_read(in, n) <= 0) {
     405         119 :                 return false;
     406             :         }
     407             :         return true;
     408             : }
     409             : 
     410             : /*
     411             :  * Fast Load
     412             :  * To speedup the CPU intensive loading of files we have to break
     413             :  * the file into pieces and perform parallel analysis. Experimentation
     414             :  * against lineitem SF1 showed that half of the time goes into very
     415             :  * basis atom analysis (41 out of 102 B instructions).
     416             :  * Furthermore, the actual insertion into the BATs takes only
     417             :  * about 10% of the total. With multi-core processors around
     418             :  * it seems we can gain here significantly.
     419             :  *
     420             :  * The approach taken is to fork a parallel scan over the text file.
     421             :  * We assume that the blocked stream is already
     422             :  * positioned correctly at the reading position. The start and limit
     423             :  * indicates the byte range to search for tuples.
     424             :  * If start> 0 then we first skip to the next record separator.
     425             :  * If necessary we read more than 'limit' bytes to ensure parsing a complete
     426             :  * record and stop at the record boundary.
     427             :  * Beware, we should allocate Tablet descriptors for each file segment,
     428             :  * otherwise we end up with a gross concurrency control problem.
     429             :  * The resulting BATs should be glued at the final phase.
     430             :  *
     431             :  * Raw Load
     432             :  * Front-ends can bypass most of the overhead in loading the BATs
     433             :  * by preparing the corresponding files directly and replace those
     434             :  * created by e.g. the SQL frontend.
     435             :  * This strategy is only advisable for cases where we have very
     436             :  * large files >200GB and/or are created by a well debugged code.
     437             :  *
     438             :  * To experiment with this approach, the code base responds
     439             :  * on negative number of cores by dumping the data directly in BAT
     440             :  * storage format into a collections of files on disk.
     441             :  * It reports on the actions to be taken to replace BATs.
     442             :  * This technique is initially only supported for fixed-sized columns.
     443             :  * The rawmode() indicator acts as the internal switch.
     444             :  */
     445             : 
     446             : /*
     447             :  * To speed up loading ascii files we have to determine the number of blocks.
     448             :  * This depends on the number of cores available.
     449             :  * For the time being we hardwire this decision based on our own
     450             :  * platforms.
     451             :  * Furthermore, we only consider parallel load for file-based requests.
     452             :  *
     453             :  * To simplify our world, we assume a single producer process.
     454             :  */
     455             : 
     456             : static int
     457        1199 : output_file_default(Tablet *as, BAT *order, stream *fd)
     458             : {
     459        1199 :         size_t len = BUFSIZ, locallen = BUFSIZ;
     460             :         int res = 0;
     461        1199 :         char *buf = GDKmalloc(len);
     462        1199 :         char *localbuf = GDKmalloc(len);
     463             :         BUN p, q;
     464             :         oid id;
     465             :         BUN i = 0;
     466        1199 :         BUN offset = as->offset;
     467             : 
     468        1199 :         if (buf == NULL || localbuf == NULL) {
     469           0 :                 GDKfree(buf);
     470           0 :                 GDKfree(localbuf);
     471           0 :                 return -1;
     472             :         }
     473      786562 :         for (q = offset + as->nr, p = offset, id = order->hseqbase + offset; p < q; p++, id++) {
     474      785363 :                 if ((res = output_line(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs, id)) < 0) {
     475           0 :                         GDKfree(buf);
     476           0 :                         GDKfree(localbuf);
     477           0 :                         return res;
     478             :                 }
     479             :                 i++;
     480             :         }
     481        1199 :         GDKfree(localbuf);
     482        1199 :         GDKfree(buf);
     483        1199 :         return res;
     484             : }
     485             : 
     486             : static int
     487       73213 : output_file_dense(Tablet *as, stream *fd)
     488             : {
     489       73213 :         size_t len = BUFSIZ, locallen = BUFSIZ;
     490             :         int res = 0;
     491       73213 :         char *buf = GDKmalloc(len);
     492       73213 :         char *localbuf = GDKmalloc(len);
     493             :         BUN i = 0;
     494             : 
     495       73213 :         if (buf == NULL || localbuf == NULL) {
     496           0 :                 GDKfree(buf);
     497           0 :                 GDKfree(localbuf);
     498           0 :                 return -1;
     499             :         }
     500     1470567 :         for (i = 0; i < as->nr; i++) {
     501     1397354 :                 if ((res = output_line_dense(&buf, &len, &localbuf, &locallen, as->format, fd, as->nr_attrs)) < 0) {
     502           0 :                         GDKfree(buf);
     503           0 :                         GDKfree(localbuf);
     504           0 :                         return res;
     505             :                 }
     506             :         }
     507       73213 :         GDKfree(localbuf);
     508       73213 :         GDKfree(buf);
     509       73213 :         return res;
     510             : }
     511             : 
     512             : static int
     513           0 : output_file_ordered(Tablet *as, BAT *order, stream *fd)
     514             : {
     515           0 :         size_t len = BUFSIZ;
     516             :         int res = 0;
     517           0 :         char *buf = GDKmalloc(len);
     518             :         BUN p, q;
     519             :         BUN i = 0;
     520           0 :         BUN offset = as->offset;
     521             : 
     522           0 :         if (buf == NULL)
     523             :                 return -1;
     524           0 :         for (q = offset + as->nr, p = offset; p < q; p++, i++) {
     525           0 :                 oid h = order->hseqbase + p;
     526             : 
     527           0 :                 if ((res = output_line_lookup(&buf, &len, as->format, fd, as->nr_attrs, h)) < 0) {
     528           0 :                         GDKfree(buf);
     529           0 :                         return res;
     530             :                 }
     531             :         }
     532           0 :         GDKfree(buf);
     533           0 :         return res;
     534             : }
     535             : 
     536             : int
     537       74412 : TABLEToutput_file(Tablet *as, BAT *order, stream *s)
     538             : {
     539             :         oid base = oid_nil;
     540       74412 :         BUN maxnr = BATcount(order);
     541             :         int ret = 0;
     542             : 
     543             :         /* only set nr if it is zero or lower (bogus) to the maximum value
     544             :          * possible (BATcount), if already set within BATcount range,
     545             :          * preserve value such that for instance SQL's reply_size still
     546             :          * works
     547             :          */
     548       74412 :         if (as->nr == BUN_NONE || as->nr > maxnr)
     549           0 :                 as->nr = maxnr;
     550             : 
     551       74412 :         base = check_BATs(as);
     552       74412 :         if (!is_oid_nil(base)) {
     553       73213 :                 if (order->hseqbase == base)
     554       73213 :                         ret = output_file_dense(as, s);
     555             :                 else
     556           0 :                         ret = output_file_ordered(as, order, s);
     557             :         } else {
     558        1199 :                 ret = output_file_default(as, order, s);
     559             :         }
     560       74412 :         return ret;
     561             : }
     562             : 
     563             : /*
     564             :  *  Niels Nes, Martin Kersten
     565             :  *
     566             :  * Parallel bulk load for SQL
     567             :  * The COPY INTO command for SQL is heavily CPU bound, which means
     568             :  * that ideally we would like to exploit the multi-cores to do that
     569             :  * work in parallel.
     570             :  * Complicating factors are the initial record offset, the
     571             :  * possible variable length of the input, and the original sort order
     572             :  * that should preferable be maintained.
     573             :  *
     574             :  * The code below consists of a file reader, which breaks up the
     575             :  * file into chunks of distinct rows. Then multiple parallel threads
     576             :  * grab them, and break them on the field boundaries.
     577             :  * After all fields are identified this way, the columns are converted
     578             :  * and stored in the BATs.
     579             :  *
     580             :  * The threads get a reference to a private copy of the READERtask.
     581             :  * It includes a list of columns they should handle. This is a basis
     582             :  * to distributed cheap and expensive columns over threads.
     583             :  *
     584             :  * The file reader overlaps IO with updates of the BAT.
     585             :  * Also the buffer size of the block stream might be a little small for
     586             :  * this task (1MB). It has been increased to 8MB, which indeed improved.
     587             :  *
     588             :  * The work divider allocates subtasks to threads based on the
     589             :  * observed time spending so far.
     590             :  */
     591             : 
     592             : /* #define MLOCK_TST did not make a difference on sf10 */
     593             : 
     594             : #define BREAKROW 1
     595             : #define UPDATEBAT 2
     596             : #define SYNCBAT 3
     597             : #define ENDOFCOPY 4
     598             : 
     599             : typedef struct {
     600             :         Client cntxt;
     601             :         int id;                                         /* for self reference */
     602             :         int state;                                      /* row break=1 , 2 = update bat */
     603             :         int workers;                            /* how many concurrent ones */
     604             :         int error;                                      /* error during row break */
     605             :         int next;
     606             :         int limit;
     607             :         BUN cnt, maxrow;                        /* first row in file chunk. */
     608             :         lng skip;                                       /* number of lines to be skipped */
     609             :         lng *time, wtime;                       /* time per col + time per thread */
     610             :         int rounds;                                     /* how often did we divide the work */
     611             :         bool ateof;                                     /* io control */
     612             :         bool from_stdin;
     613             :         bool escape;                            /* whether to handle \ escapes */
     614             :         bstream *b;
     615             :         stream *out;
     616             :         MT_Id tid;
     617             :         MT_Sema producer;                       /* reader waits for call */
     618             :         MT_Sema consumer;                       /* reader waits for call */
     619             :         MT_Sema sema; /* threads wait for work , negative next implies exit */
     620             :         MT_Sema reply;                          /* let reader continue */
     621             :         Tablet *as;
     622             :         char *errbuf;
     623             :         const char *csep, *rsep;
     624             :         size_t seplen, rseplen;
     625             :         char quote;
     626             : 
     627             :         char *base[MAXBUFFERS], *input[MAXBUFFERS];     /* buffers for row splitter and tokenizer */
     628             :         size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer */
     629             :         char **rows[MAXBUFFERS];
     630             :         lng *startlineno[MAXBUFFERS];
     631             :         int top[MAXBUFFERS];            /* number of rows in this buffer */
     632             :         int cur;  /* current buffer used by splitter and update threads */
     633             : 
     634             :         int *cols;                                      /* columns to handle */
     635             :         char ***fields;
     636             :         int besteffort;
     637             :         bte *rowerror;
     638             :         int errorcnt;
     639             : } READERtask;
     640             : 
     641             : static void
     642           9 : tablet_error(READERtask *task, lng row, lng lineno, int col, const char *msg, const char *fcn)
     643             : {
     644           9 :         MT_lock_set(&errorlock);
     645           9 :         if (task->cntxt->error_row != NULL) {
     646          18 :                 if (BUNappend(task->cntxt->error_row, &lineno, false) != GDK_SUCCEED ||
     647          18 :                         BUNappend(task->cntxt->error_fld, &col, false) != GDK_SUCCEED ||
     648          18 :                         BUNappend(task->cntxt->error_msg, msg, false) != GDK_SUCCEED ||
     649           9 :                         BUNappend(task->cntxt->error_input, fcn, false) != GDK_SUCCEED)
     650           0 :                         task->besteffort = 0;
     651           9 :                 if (!is_lng_nil(row) && task->rowerror && row < task->limit)
     652           9 :                         task->rowerror[row]++;
     653             :         }
     654           9 :         if (task->as->error == NULL) {
     655           7 :                 if (msg == NULL)
     656           0 :                         task->besteffort = 0;
     657           7 :                 else if (!is_lng_nil(lineno)) {
     658           7 :                         if (!is_int_nil(col))
     659           4 :                                 task->as->error = createException(MAL, "sql.copy_from", "line " LLFMT ": column %d: %s", lineno, col + 1, msg);
     660             :                         else
     661           3 :                                 task->as->error = createException(MAL, "sql.copy_from", "line " LLFMT ": %s", lineno, msg);
     662             :                 } else
     663           0 :                         task->as->error = createException(MAL, "sql.copy_from", "%s", msg);
     664             :         }
     665           9 :         task->errorcnt++;
     666           9 :         MT_lock_unset(&errorlock);
     667           9 : }
     668             : 
     669             : /*
     670             :  * The row is broken into pieces directly on their field separators. It assumes that we have
     671             :  * the record in the cache already, so we can do most work quickly.
     672             :  * Furthermore, it assume a uniform (SQL) pattern, without whitespace skipping, but with quote and separator.
     673             :  */
     674             : 
     675             : static size_t
     676         111 : mystrlen(const char *s)
     677             : {
     678             :         /* Calculate and return the space that is needed for the function
     679             :          * mycpstr below to do its work. */
     680             :         size_t len = 0;
     681             :         const char *s0 = s;
     682             : 
     683       30516 :         while (*s) {
     684       30405 :                 if ((*s & 0x80) == 0) {
     685             :                         ;
     686           6 :                 } else if ((*s & 0xC0) == 0x80) {
     687             :                         /* continuation byte */
     688           0 :                         len += 3;
     689           6 :                 } else if ((*s & 0xE0) == 0xC0) {
     690             :                         /* two-byte sequence */
     691           6 :                         if ((s[1] & 0xC0) != 0x80)
     692           0 :                                 len += 3;
     693             :                         else
     694           6 :                                 s += 2;
     695           0 :                 } else if ((*s & 0xF0) == 0xE0) {
     696             :                         /* three-byte sequence */
     697           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
     698           0 :                                 len += 3;
     699             :                         else
     700           0 :                                 s += 3;
     701           0 :                 } else if ((*s & 0xF8) == 0xF0) {
     702             :                         /* four-byte sequence */
     703           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80 || (s[3] & 0xC0) != 0x80)
     704           0 :                                 len += 3;
     705             :                         else
     706           0 :                                 s += 4;
     707             :                 } else {
     708             :                         /* not a valid start byte */
     709           0 :                         len += 3;
     710             :                 }
     711       30405 :                 s++;
     712             :         }
     713         111 :         len += s - s0;
     714         111 :         return len;
     715             : }
     716             : 
     717             : static char *
     718         176 : mycpstr(char *t, const char *s)
     719             : {
     720             :         /* Copy the string pointed to by s into the buffer pointed to by
     721             :          * t, and return a pointer to the NULL byte at the end.  During
     722             :          * the copy we translate incorrect UTF-8 sequences to escapes
     723             :          * looking like <XX> where XX is the hexadecimal representation of
     724             :          * the incorrect byte.  The buffer t needs to be large enough to
     725             :          * hold the result, but the correct length can be calculated by
     726             :          * the function mystrlen above.*/
     727       30652 :         while (*s) {
     728       30476 :                 if ((*s & 0x80) == 0) {
     729       30470 :                         *t++ = *s++;
     730           6 :                 } else if ((*s & 0xC0) == 0x80) {
     731           0 :                         t += sprintf(t, "<%02X>", (uint8_t) *s++);
     732           6 :                 } else if ((*s & 0xE0) == 0xC0) {
     733             :                         /* two-byte sequence */
     734           6 :                         if ((s[1] & 0xC0) != 0x80)
     735           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) *s++);
     736             :                         else {
     737           6 :                                 *t++ = *s++;
     738           6 :                                 *t++ = *s++;
     739             :                         }
     740           0 :                 } else if ((*s & 0xF0) == 0xE0) {
     741             :                         /* three-byte sequence */
     742           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80)
     743           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) *s++);
     744             :                         else {
     745           0 :                                 *t++ = *s++;
     746           0 :                                 *t++ = *s++;
     747           0 :                                 *t++ = *s++;
     748             :                         }
     749           0 :                 } else if ((*s & 0xF8) == 0xF0) {
     750             :                         /* four-byte sequence */
     751           0 :                         if ((s[1] & 0xC0) != 0x80 || (s[2] & 0xC0) != 0x80 || (s[3] & 0xC0) != 0x80)
     752           0 :                                 t += sprintf(t, "<%02X>", (uint8_t) *s++);
     753             :                         else {
     754           0 :                                 *t++ = *s++;
     755           0 :                                 *t++ = *s++;
     756           0 :                                 *t++ = *s++;
     757           0 :                                 *t++ = *s++;
     758             :                         }
     759             :                 } else {
     760             :                         /* not a valid start byte */
     761           0 :                         t += sprintf(t, "<%02X>", (uint8_t) *s++);
     762             :                 }
     763             :         }
     764         176 :         *t = 0;
     765         176 :         return t;
     766             : }
     767             : 
     768             : static str
     769          31 : SQLload_error(READERtask *task, lng idx, BUN attrs)
     770             : {
     771             :         str line;
     772             :         char *s;
     773             :         size_t sz = 0;
     774             :         BUN i;
     775             : 
     776         127 :         for (i = 0; i < attrs; i++) {
     777          96 :                 if (task->fields[i][idx])
     778          86 :                         sz += mystrlen(task->fields[i][idx]);
     779          96 :                 sz += task->seplen;
     780             :         }
     781             : 
     782          31 :         s = line = GDKmalloc(sz + task->rseplen + 1);
     783          31 :         if (line == 0) {
     784           0 :                 tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc error", "SQLload_error");
     785           0 :                 return 0;
     786             :         }
     787         127 :         for (i = 0; i < attrs; i++) {
     788          96 :                 if (task->fields[i][idx])
     789          86 :                         s = mycpstr(s, task->fields[i][idx]);
     790          96 :                 if (i < attrs - 1)
     791          65 :                         s = mycpstr(s, task->csep);
     792             :         }
     793          31 :         strcpy(s, task->rsep);
     794          31 :         return line;
     795             : }
     796             : 
     797             : /*
     798             :  * The parsing of the individual values is straightforward. If the value represents
     799             :  * the null-replacement string then we grab the underlying nil.
     800             :  * If the string starts with the quote identified from SQL, we locate the tail
     801             :  * and interpret the body.
     802             :  *
     803             :  * If inserting fails, we return -1; if the value cannot be parsed, we
     804             :  * return -1 if besteffort is not set, otherwise we return 0, but in
     805             :  * either case an entry is added to the error table.
     806             :  */
     807             : static inline int
     808   337677093 : SQLinsert_val(READERtask *task, int col, int idx)
     809             : {
     810   337677093 :         Column *fmt = task->as->format + col;
     811             :         const void *adt;
     812             :         char buf[BUFSIZ];
     813   337677093 :         char *s = task->fields[col][idx];
     814             :         char *err = NULL;
     815             :         int ret = 0;
     816             : 
     817             :         /* include testing on the terminating null byte !! */
     818   337677093 :         if (s == 0) {
     819     6568193 :                 adt = fmt->nildata;
     820     6568193 :                 fmt->c->tnonil = false;
     821             :         } else {
     822   331108900 :                 if (task->escape) {
     823   330908880 :                         size_t slen = strlen(s) + 1;
     824   330908880 :                         char *data = slen <= sizeof(buf) ? buf : GDKmalloc(strlen(s) + 1);
     825   684209591 :                         if (data == NULL ||
     826   330944458 :                                 GDKstrFromStr((unsigned char *) data, (unsigned char *) s, strlen(s)) < 0)
     827             :                                 adt = NULL;
     828             :                         else
     829   353300710 :                                 adt = fmt->frstr(fmt, fmt->adt, data);
     830   346637293 :                         if (data != buf)
     831          15 :                                 GDKfree(data);
     832             :                 } else
     833      200020 :                         adt = fmt->frstr(fmt, fmt->adt, s);
     834             :         }
     835             : 
     836             :         /* col is zero-based, but for error messages it needs to be
     837             :          * one-based, and from here on, we only use col anymore to produce
     838             :          * error messages */
     839   353405506 :         col++;
     840             : 
     841   353405506 :         if (adt == NULL) {
     842          25 :                 lng row = task->cnt + idx + 1;
     843          25 :                 snprintf(buf, sizeof(buf), "'%s' expected", fmt->type);
     844          25 :                 err = SQLload_error(task, idx, task->as->nr_attrs);
     845          25 :                 if (task->rowerror) {
     846          25 :                         if (s) {
     847          25 :                                 size_t slen = mystrlen(s);
     848          25 :                                 char *scpy = GDKmalloc(slen + 1);
     849          25 :                                 if ( scpy == NULL){
     850           0 :                                         task->rowerror[idx]++;
     851           0 :                                         task->errorcnt++;
     852           0 :                                         task->besteffort = 0; /* no longer best effort */
     853           0 :                                         if (task->cntxt->error_row == NULL ||
     854           0 :                                                 BUNappend(task->cntxt->error_row, &row, false) != GDK_SUCCEED ||
     855           0 :                                                 BUNappend(task->cntxt->error_fld, &col, false) != GDK_SUCCEED ||
     856           0 :                                                 BUNappend(task->cntxt->error_msg, SQLSTATE(HY013) MAL_MALLOC_FAIL, false) != GDK_SUCCEED ||
     857           0 :                                                 BUNappend(task->cntxt->error_input, err, false) != GDK_SUCCEED) {
     858             :                                                 ;               /* ignore error here: we're already not best effort */
     859             :                                         }
     860           0 :                                         GDKfree(err);
     861           0 :                                         return -1;
     862             :                                 }
     863          25 :                                 mycpstr(scpy, s);
     864             :                                 s = scpy;
     865             :                         }
     866          25 :                         MT_lock_set(&errorlock);
     867          25 :                         snprintf(buf, sizeof(buf),
     868             :                                          "line " LLFMT " field %s '%s' expected%s%s%s",
     869          25 :                                          task->startlineno[task->cur][idx], fmt->name ? fmt->name : "", fmt->type,
     870             :                                          s ? " in '" : "", s ? s : "", s ? "'" : "");
     871          25 :                         GDKfree(s);
     872          25 :                         if (task->as->error == NULL && (task->as->error = GDKstrdup(buf)) == NULL)
     873           0 :                                 task->as->error = createException(MAL, "sql.copy_from", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     874          25 :                         task->rowerror[idx]++;
     875          25 :                         task->errorcnt++;
     876          50 :                         if (task->cntxt->error_row == NULL ||
     877          50 :                                 BUNappend(task->cntxt->error_row, &row, false) != GDK_SUCCEED ||
     878          50 :                                 BUNappend(task->cntxt->error_fld, &col, false) != GDK_SUCCEED ||
     879          50 :                                 BUNappend(task->cntxt->error_msg, buf, false) != GDK_SUCCEED ||
     880          25 :                                 BUNappend(task->cntxt->error_input, err, false) != GDK_SUCCEED) {
     881           0 :                                 GDKfree(err);
     882           0 :                                 task->besteffort = 0; /* no longer best effort */
     883           0 :                                 MT_lock_unset(&errorlock);
     884           0 :                                 return -1;
     885             :                         }
     886          25 :                         MT_lock_unset(&errorlock);
     887             :                 }
     888          25 :                 ret = -!task->besteffort; /* yep, two unary operators ;-) */
     889          25 :                 GDKfree(err);
     890             :                 err = NULL;
     891             :                 /* replace it with a nil */
     892          25 :                 adt = fmt->nildata;
     893          25 :                 fmt->c->tnonil = false;
     894             :         }
     895   353405506 :         if (bunfastapp(fmt->c, adt) == GDK_SUCCEED)
     896             :                 return ret;
     897             : 
     898             :         /* failure */
     899           0 :         if (task->rowerror) {
     900           0 :                 lng row = BATcount(fmt->c);
     901           0 :                 MT_lock_set(&errorlock);
     902           0 :                 if (task->cntxt->error_row == NULL ||
     903           0 :                         BUNappend(task->cntxt->error_row, &row, false) != GDK_SUCCEED ||
     904           0 :                         BUNappend(task->cntxt->error_fld, &col, false) != GDK_SUCCEED ||
     905           0 :                         BUNappend(task->cntxt->error_msg, "insert failed", false) != GDK_SUCCEED ||
     906           0 :                         (err = SQLload_error(task, idx,task->as->nr_attrs)) == NULL ||
     907           0 :                         BUNappend(task->cntxt->error_input, err, false) != GDK_SUCCEED)
     908           0 :                         task->besteffort = 0;
     909           0 :                 GDKfree(err);
     910           0 :                 task->rowerror[idx]++;
     911           0 :                 task->errorcnt++;
     912           0 :                 MT_lock_unset(&errorlock);
     913             :         }
     914           0 :         task->besteffort = 0;                /* no longer best effort */
     915           0 :         return -1;
     916             : }
     917             : 
     918             : static int
     919      699350 : SQLworker_column(READERtask *task, int col)
     920             : {
     921             :         int i;
     922      699350 :         Column *fmt = task->as->format;
     923             : 
     924      699350 :         if (fmt[col].c == NULL)
     925             :                 return 0;
     926             : 
     927             :         /* watch out for concurrent threads */
     928      699344 :         MT_lock_set(&mal_copyLock);
     929      699531 :         if (!fmt[col].skip && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
     930         254 :                 if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) != GDK_SUCCEED) {
     931           0 :                         tablet_error(task, lng_nil, lng_nil, col, "Failed to extend the BAT\n", "SQLworker_column");
     932           0 :                         MT_lock_unset(&mal_copyLock);
     933           0 :                         return -1;
     934             :                 }
     935             :         }
     936      699531 :         MT_lock_unset(&mal_copyLock);
     937             : 
     938   339769765 :         for (i = 0; i < task->top[task->cur]; i++) {
     939   339070593 :                 if (!fmt[col].skip && SQLinsert_val(task, col, i) < 0) {
     940          19 :                         BATsetcount(fmt[col].c, BATcount(fmt[col].c));
     941          19 :                         return -1;
     942             :                 }
     943             :         }
     944      699172 :         BATsetcount(fmt[col].c, BATcount(fmt[col].c));
     945      699419 :         fmt[col].c->theap->dirty |= BATcount(fmt[col].c) > 0;
     946             : 
     947      699419 :         return 0;
     948             : }
     949             : 
     950             : /*
     951             :  * The rows are broken on the column separator. Any error is shown and reflected with
     952             :  * setting the reference of the offending row fields to NULL.
     953             :  * This allows the loading to continue, skipping the minimal number of rows.
     954             :  * The details about the locations can be inspected from the error table.
     955             :  * We also trim the quotes around strings.
     956             :  */
     957             : static int
     958   133898042 : SQLload_parse_row(READERtask *task, int idx)
     959             : {
     960             :         BUN i;
     961             :         char errmsg[BUFSIZ];
     962   133898042 :         char ch = *task->csep;
     963   133898042 :         char *row = task->rows[task->cur][idx];
     964   133898042 :         lng startlineno = task->startlineno[task->cur][idx];
     965   133898042 :         Tablet *as = task->as;
     966   133898042 :         Column *fmt = as->format;
     967             :         bool error = false;
     968             :         str errline = 0;
     969             : 
     970   133898042 :         assert(idx < task->top[task->cur]);
     971   133898042 :         assert(row);
     972   133898042 :         errmsg[0] = 0;
     973             : 
     974   133898042 :         if (task->quote || task->seplen != 1) {
     975    10974074 :                 for (i = 0; i < as->nr_attrs; i++) {
     976             :                         bool quote = false;
     977     9173359 :                         task->fields[i][idx] = row;
     978             :                         /* recognize fields starting with a quote, keep them */
     979     9173359 :                         if (*row && *row == task->quote) {
     980             :                                 quote = true;
     981     5307661 :                                 task->fields[i][idx] = row + 1;
     982     5307661 :                                 row = tablet_skip_string(row + 1, task->quote, task->escape);
     983             : 
     984     4634400 :                                 if (!row) {
     985           0 :                                         errline = SQLload_error(task, idx, i+1);
     986           0 :                                         snprintf(errmsg, BUFSIZ, "Quote (%c) missing", task->quote);
     987           0 :                                         tablet_error(task, idx, startlineno, (int) i + 1, errmsg, errline);
     988           0 :                                         GDKfree(errline);
     989             :                                         error = true;
     990           0 :                                         goto errors1;
     991             :                                 } else
     992     4634400 :                                         *row++ = 0;
     993             :                         }
     994             : 
     995             :                         /* eat away the column separator */
     996    35953597 :                         for (; *row; row++)
     997    34648853 :                                 if (*row == '\\') {
     998           2 :                                         if (row[1])
     999           2 :                                                 row++;
    1000    34648851 :                                 } else if (*row == ch && (task->seplen == 1 || strncmp(row, task->csep, task->seplen) == 0)) {
    1001     7195354 :                                         *row = 0;
    1002     7195354 :                                         row += task->seplen;
    1003     7195354 :                                         goto endoffieldcheck;
    1004             :                                 }
    1005             : 
    1006             :                         /* not enough fields */
    1007     1304744 :                         if (i < as->nr_attrs - 1) {
    1008           0 :                                 errline = SQLload_error(task, idx, i+1);
    1009           0 :                                 tablet_error(task, idx, startlineno, (int) i + 1, "Column value missing", errline);
    1010           0 :                                 GDKfree(errline);
    1011             :                                 error = true;
    1012           0 :                           errors1:
    1013             :                                 /* we save all errors detected  as NULL values */
    1014           0 :                                 for (; i < as->nr_attrs; i++)
    1015           0 :                                         task->fields[i][idx] = NULL;
    1016           0 :                                 i--;
    1017             :                         }
    1018     1304744 :                   endoffieldcheck:
    1019             :                         ;
    1020             :                         /* check for user defined NULL string */
    1021     8500098 :                         if ((!quote || !fmt->null_length) && fmt->nullstr && task->fields[i][idx] && strncasecmp(task->fields[i][idx], fmt->nullstr, fmt->null_length + 1) == 0)
    1022     2142536 :                                 task->fields[i][idx] = 0;
    1023             :                 }
    1024             :         } else {
    1025             :                 assert(!task->quote);
    1026             :                 assert(task->seplen == 1);
    1027   354978076 :                 for (i = 0; i < as->nr_attrs; i++) {
    1028   223554010 :                         task->fields[i][idx] = row;
    1029             : 
    1030             :                         /* eat away the column separator */
    1031  1886233774 :                         for (; *row; row++)
    1032  1754817389 :                                 if (*row == '\\') {
    1033    16758036 :                                         if (row[1])
    1034    16758036 :                                                 row++;
    1035  1738059353 :                                 } else if (*row == ch) {
    1036    92137625 :                                         *row = 0;
    1037    92137625 :                                         row++;
    1038    92137625 :                                         goto endoffield2;
    1039             :                                 }
    1040             : 
    1041             :                         /* not enough fields */
    1042   131416385 :                         if (i < as->nr_attrs - 1) {
    1043           5 :                                 errline = SQLload_error(task, idx,i+1);
    1044           5 :                                 tablet_error(task, idx, startlineno, (int) i + 1, "Column value missing", errline);
    1045           5 :                                 GDKfree(errline);
    1046             :                                 error = true;
    1047             :                                 /* we save all errors detected */
    1048          15 :                                 for (; i < as->nr_attrs; i++)
    1049          10 :                                         task->fields[i][idx] = NULL;
    1050           5 :                                 i--;
    1051             :                         }
    1052   131416380 :                   endoffield2:
    1053             :                         ;
    1054             :                         /* check for user defined NULL string */
    1055   223554010 :                         if (fmt->nullstr && task->fields[i][idx] && strncasecmp(task->fields[i][idx], fmt->nullstr, fmt->null_length + 1) == 0) {
    1056     3802174 :                                 task->fields[i][idx] = 0;
    1057             :                         }
    1058             :                 }
    1059             :         }
    1060             :         /* check for too many values as well*/
    1061   133224781 :         if (row && *row && i == as->nr_attrs) {
    1062           1 :                 errline = SQLload_error(task, idx, task->as->nr_attrs);
    1063           1 :                 snprintf(errmsg, BUFSIZ, "Leftover data '%s'",row);
    1064           1 :                 tablet_error(task, idx, startlineno, (int) i + 1, errmsg, errline);
    1065           1 :                 GDKfree(errline);
    1066             :                 error = true;
    1067             :         }
    1068   133224780 :         return error ? -1 : 0;
    1069             : }
    1070             : 
    1071             : static void
    1072         925 : SQLworker(void *arg)
    1073             : {
    1074             :         READERtask *task = (READERtask *) arg;
    1075             :         unsigned int i;
    1076             :         int j, piece;
    1077             :         lng t0;
    1078             : 
    1079         925 :         GDKsetbuf(GDKmalloc(GDKMAXERRLEN));     /* where to leave errors */
    1080         925 :         GDKclrerr();
    1081         925 :         task->errbuf = GDKerrbuf;
    1082             : 
    1083      668327 :         while (task->top[task->cur] >= 0) {
    1084      668327 :                 MT_sema_down(&task->sema);
    1085             : 
    1086             :                 /* stage one, break the rows spread the work over the workers */
    1087      668344 :                 switch (task->state) {
    1088      333428 :                 case BREAKROW:
    1089      333428 :                         t0 = GDKusec();
    1090      333455 :                         piece = (task->top[task->cur] + task->workers) / task->workers;
    1091             : 
    1092   133414656 :                         for (j = piece * task->id; j < task->top[task->cur] && j < piece * (task->id +1); j++)
    1093   133081300 :                                 if (task->rows[task->cur][j]) {
    1094   133081300 :                                         if (SQLload_parse_row(task, j) < 0) {
    1095           6 :                                                 task->errorcnt++;
    1096             :                                                 // early break unless best effort
    1097           6 :                                                 if (!task->besteffort) {
    1098           5 :                                                         for (j++; j < task->top[task->cur] && j < piece * (task->id +1); j++)
    1099           8 :                                                                 for (i = 0; i < task->as->nr_attrs; i++)
    1100           6 :                                                                         task->fields[i][j] = NULL;
    1101             :                                                         break;
    1102             :                                                 }
    1103             :                                         }
    1104             :                                 }
    1105      333359 :                         task->wtime = GDKusec() - t0;
    1106      333404 :                         break;
    1107      333088 :                 case UPDATEBAT:
    1108      333088 :                         if (!task->besteffort && task->errorcnt)
    1109             :                                 break;
    1110             :                         /* stage two, updating the BATs */
    1111     1660477 :                         for (i = 0; i < task->as->nr_attrs; i++)
    1112     1327137 :                                 if (task->cols[i]) {
    1113      699168 :                                         t0 = GDKusec();
    1114      699463 :                                         if (SQLworker_column(task, task->cols[i] - 1) < 0)
    1115             :                                                 break;
    1116      699408 :                                         t0 = GDKusec() - t0;
    1117      699423 :                                         task->time[i] += t0;
    1118      699423 :                                         task->wtime += t0;
    1119             :                                 }
    1120             :                         break;
    1121         903 :                 case SYNCBAT:
    1122         903 :                         if (!task->besteffort && task->errorcnt)
    1123             :                                 break;
    1124       12194 :                         for (i = 0; i < task->as->nr_attrs; i++)
    1125       11291 :                                 if (task->cols[i]) {
    1126        8800 :                                         BAT *b = task->as->format[task->cols[i] - 1].c;
    1127        8800 :                                         if (b == NULL)
    1128           6 :                                                 continue;
    1129        8794 :                                         t0 = GDKusec();
    1130        8794 :                                         if (b->batTransient)
    1131        8794 :                                                 continue;
    1132           0 :                                         BATmsync(b);
    1133           0 :                                         t0 = GDKusec() - t0;
    1134           0 :                                         task->time[i] += t0;
    1135           0 :                                         task->wtime += t0;
    1136             :                                 }
    1137             :                         break;
    1138         925 :                 case ENDOFCOPY:
    1139         925 :                         MT_sema_up(&task->reply);
    1140         925 :                         goto do_return;
    1141             :                 }
    1142      667669 :                 MT_sema_up(&task->reply);
    1143             :         }
    1144           0 :         MT_sema_up(&task->reply);
    1145             : 
    1146         925 :   do_return:
    1147         925 :         GDKfree(GDKerrbuf);
    1148         925 :         GDKsetbuf(0);
    1149         925 : }
    1150             : 
    1151             : static void
    1152      132059 : SQLworkdivider(READERtask *task, READERtask *ptask, int nr_attrs, int threads)
    1153             : {
    1154             :         int i, j, mi;
    1155             :         lng loc[MAXWORKERS];
    1156             : 
    1157             :         /* after a few rounds we stick to the work assignment */
    1158      132059 :         if (task->rounds > 8)
    1159      131965 :                 return;
    1160             :         /* simple round robin the first time */
    1161       31507 :         if (threads == 1 || task->rounds++ == 0) {
    1162      417643 :                 for (i = j = 0; i < nr_attrs; i++, j++)
    1163      386230 :                         ptask[j % threads].cols[i] = task->cols[i];
    1164             :                 return;
    1165             :         }
    1166          94 :         memset((char *) loc, 0, sizeof(lng) * MAXWORKERS);
    1167             :         /* use of load directives */
    1168        1808 :         for (i = 0; i < nr_attrs; i++)
    1169        6856 :                 for (j = 0; j < threads; j++)
    1170        5142 :                         ptask[j].cols[i] = 0;
    1171             : 
    1172             :         /* now allocate the work to the threads */
    1173        1808 :         for (i = 0; i < nr_attrs; i++, j++) {
    1174             :                 mi = 0;
    1175        5142 :                 for (j = 1; j < threads; j++)
    1176        3428 :                         if (loc[j] < loc[mi])
    1177             :                                 mi = j;
    1178             : 
    1179        1714 :                 ptask[mi].cols[i] = task->cols[i];
    1180        1714 :                 loc[mi] += task->time[i];
    1181             :         }
    1182             :         /* reset the timer */
    1183        1808 :         for (i = 0; i < nr_attrs; i++, j++)
    1184        1714 :                 task->time[i] = 0;
    1185             : }
    1186             : 
    1187             : /*
    1188             :  * Reading is handled by a separate task as a preparation for more parallelism.
    1189             :  * A buffer is filled with proper rows.
    1190             :  * If we are reading from a file then a double buffering scheme ia activated.
    1191             :  * Reading from the console (stdin) remains single buffered only.
    1192             :  * If we end up with unfinished records, then the rowlimit will terminate the process.
    1193             :  */
    1194             : 
    1195             : typedef unsigned char (*dfa_t)[256];
    1196             : 
    1197             : static dfa_t
    1198         805 : mkdfa(const unsigned char *sep, size_t seplen)
    1199             : {
    1200             :         dfa_t dfa;
    1201             :         size_t i, j, k;
    1202             : 
    1203         805 :         dfa = GDKzalloc(seplen * sizeof(*dfa));
    1204         805 :         if (dfa == NULL)
    1205             :                 return NULL;
    1206             :         /* Each character in the separator string advances the state by
    1207             :          * one.  If state reaches seplen, the separator was recognized.
    1208             :          *
    1209             :          * The first loop and the nested loop make sure that if in any
    1210             :          * state we encounter an invalid character, but part of what we've
    1211             :          * matched so far is a prefix of the separator, we go to the
    1212             :          * appropriate state. */
    1213        1632 :         for (i = 0; i < seplen; i++)
    1214         827 :                 dfa[i][sep[0]] = 1;
    1215        1632 :         for (j = 0; j < seplen; j++) {
    1216         827 :                 dfa[j][sep[j]] = (unsigned char) (j + 1);
    1217         849 :                 for (k = 0; k < j; k++) {
    1218          44 :                         for (i = 0; i < j - k; i++)
    1219          22 :                                 if (sep[k + i] != sep[i])
    1220             :                                         break;
    1221          22 :                         if (i == j - k && dfa[j][sep[i]] <= i)
    1222           0 :                                 dfa[j][sep[i]] = (unsigned char) (i + 1);
    1223             :                 }
    1224             :         }
    1225             :         return dfa;
    1226             : }
    1227             : 
    1228             : #ifdef __GNUC__
    1229             : /* __builtin_expect returns its first argument; it is expected to be
    1230             :  * equal to the second argument */
    1231             : #define unlikely(expr)  __builtin_expect((expr) != 0, 0)
    1232             : #define likely(expr)    __builtin_expect((expr) != 0, 1)
    1233             : #else
    1234             : #define unlikely(expr)  (expr)
    1235             : #define likely(expr)    (expr)
    1236             : #endif
    1237             : 
    1238             : static void
    1239         805 : SQLproducer(void *p)
    1240             : {
    1241             :         READERtask *task = (READERtask *) p;
    1242             :         bool consoleinput = false;
    1243             :         int cur = 0;            // buffer being filled
    1244         805 :         bool blocked[MAXBUFFERS] = { false };
    1245         805 :         bool ateof[MAXBUFFERS] = { false };
    1246         805 :         BUN cnt = 0, bufcnt[MAXBUFFERS] = { 0 };
    1247             :         char *end = NULL, *e = NULL, *s = NULL, *base;
    1248         805 :         const char *rsep = task->rsep;
    1249         805 :         size_t rseplen = strlen(rsep), partial = 0;
    1250         805 :         char quote = task->quote;
    1251             :         dfa_t rdfa;
    1252             :         lng rowno = 0;
    1253             :         lng lineno = 1;
    1254             :         lng startlineno = 1;
    1255             :         int more = 0;
    1256             : 
    1257         805 :         MT_sema_down(&task->producer);
    1258         805 :         if (task->id < 0) {
    1259             :                 return;
    1260             :         }
    1261             : 
    1262         805 :         rdfa = mkdfa((const unsigned char *) rsep, rseplen);
    1263         805 :         if (rdfa == NULL) {
    1264           0 :                 tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate memory", "");
    1265           0 :                 ateof[cur] = true;
    1266           0 :                 goto reportlackofinput;
    1267             :         }
    1268             : 
    1269             : /*      TRC_DEBUG(MAL_SERVER, "SQLproducer started size '%zu' and len '%zu'\n", task->b->size, task->b->len);*/
    1270             : 
    1271         805 :         base = end = s = task->input[cur];
    1272         805 :         *s = 0;
    1273         805 :         task->cur = cur;
    1274         805 :         if (task->as->filename == NULL) {
    1275             :                 consoleinput = true;
    1276         537 :                 goto parseSTDIN;
    1277             :         }
    1278             :         for (;;) {
    1279      131413 :                 startlineno = lineno;
    1280      131681 :                 ateof[cur] = !tablet_read_more(task->b, task->out, task->b->size);
    1281             : 
    1282             :                 // we may be reading from standard input and may be out of input
    1283             :                 // warn the consumers
    1284      131681 :                 if (ateof[cur] && partial) {
    1285             :                         if (unlikely(partial)) {
    1286           1 :                                 tablet_error(task, rowno, lineno, int_nil, "incomplete record at end of file", s);
    1287           1 :                                 task->b->pos += partial;
    1288             :                         }
    1289           1 :                         goto reportlackofinput;
    1290             :                 }
    1291             : 
    1292      131680 :                 if (task->errbuf && task->errbuf[0]) {
    1293           0 :                         if (unlikely(GDKerrbuf && GDKerrbuf[0])) {
    1294           0 :                                 tablet_error(task, rowno, lineno, int_nil, GDKerrbuf, "SQLload_file");
    1295             : /*                              TRC_DEBUG(MAL_SERVER, "Bailout on SQLload\n");*/
    1296             :                                 ateof[cur] = true;
    1297             :                                 break;
    1298             :                         }
    1299             :                 }
    1300             : 
    1301      131680 :           parseSTDIN:
    1302             : 
    1303             :                 /* copy the stream buffer into the input buffer, which is guaranteed larger, but still limited */
    1304             :                 partial = 0;
    1305      132217 :                 task->top[cur] = 0;
    1306      132217 :                 s = task->input[cur];
    1307             :                 base = end;
    1308             :                 /* avoid too long records */
    1309      132217 :                 if (unlikely(end - s + task->b->len - task->b->pos >= task->rowlimit[cur])) {
    1310             :                         /* the input buffer should be extended, but 'base' is not shared
    1311             :                            between the threads, which we can not now update.
    1312             :                            Mimick an ateof instead; */
    1313           0 :                         tablet_error(task, rowno, lineno, int_nil, "record too long", "");
    1314           0 :                         ateof[cur] = true;
    1315             : /*                      TRC_DEBUG(MAL_SERVER, "Bailout on SQLload confronted with too large record\n");*/
    1316           0 :                         goto reportlackofinput;
    1317             :                 }
    1318      132217 :                 memcpy(end, task->b->buf + task->b->pos, task->b->len - task->b->pos);
    1319      132217 :                 end = end + task->b->len - task->b->pos;
    1320      132217 :                 *end = '\0';    /* this is safe, as the stream ensures an extra byte */
    1321             :                 /* Note that we rescan from the start of a record (the last
    1322             :                  * partial buffer from the previous iteration), even if in the
    1323             :                  * previous iteration we have already established that there
    1324             :                  * is no record separator in the first, perhaps significant,
    1325             :                  * part of the buffer. This is because if the record separator
    1326             :                  * is longer than one byte, it is too complex (i.e. would
    1327             :                  * require more state) to be sure what the state of the quote
    1328             :                  * status is when we back off a few bytes from where the last
    1329             :                  * scan ended (we need to back off some since we could be in
    1330             :                  * the middle of the record separator).  If this is too
    1331             :                  * costly, we have to rethink the matter. */
    1332      132217 :                 if (task->from_stdin && *s == '\n' && task->maxrow == BUN_MAX) {
    1333           0 :                         ateof[cur] = true;
    1334           0 :                         goto reportlackofinput;
    1335             :                 }
    1336   140628565 :                 for (e = s; *e && e < end && cnt < task->maxrow;) {
    1337             :                         /* tokenize the record completely
    1338             :                          *
    1339             :                          * The format of the input should comply to the following
    1340             :                          * grammar rule [ [[quote][[esc]char]*[quote]csep]*rsep]*
    1341             :                          * where quote is a single user-defined character.
    1342             :                          * Within the quoted fields a character may be escaped
    1343             :                          * with a backslash.  The correct number of fields should
    1344             :                          * be supplied.  In the first phase we simply break the
    1345             :                          * rows at the record boundary. */
    1346             :                         int nutf = 0;
    1347             :                         int m = 0;
    1348             :                         bool bs = false;
    1349             :                         char q = 0;
    1350             :                         size_t i = 0;
    1351  2901089444 :                         while (*e) {
    1352  2901088512 :                                 if (task->skip > 0) {
    1353             :                                         /* no interpretation of data we're skipping, just
    1354             :                                          * look for newline */
    1355        1505 :                                         if (*e == '\n') {
    1356          10 :                                                 lineno++;
    1357          10 :                                                 break;
    1358             :                                         }
    1359             :                                 } else {
    1360             :                                         /* check for correctly encoded UTF-8 */
    1361  2901087007 :                                         if (nutf > 0) {
    1362        1601 :                                                 if (unlikely((*e & 0xC0) != 0x80))
    1363           1 :                                                         goto badutf8;
    1364        3200 :                                                 if (unlikely(m != 0 && (*e & m) == 0))
    1365           0 :                                                         goto badutf8;
    1366             :                                                 m = 0;
    1367        1600 :                                                 nutf--;
    1368  2901085406 :                                         } else if ((*e & 0x80) != 0) {
    1369        1583 :                                                 if ((*e & 0xE0) == 0xC0) {
    1370             :                                                         nutf = 1;
    1371        1566 :                                                         if (unlikely((e[0] & 0x1E) == 0))
    1372           0 :                                                                 goto badutf8;
    1373          17 :                                                 } else if ((*e & 0xF0) == 0xE0) {
    1374             :                                                         nutf = 2;
    1375          13 :                                                         if ((e[0] & 0x0F) == 0)
    1376             :                                                                 m = 0x20;
    1377           4 :                                                 } else if (likely((*e & 0xF8) == 0xF0)) {
    1378             :                                                         nutf = 3;
    1379           3 :                                                         if ((e[0] & 0x07) == 0)
    1380             :                                                                 m = 0x30;
    1381             :                                                 } else {
    1382           1 :                                                         goto badutf8;
    1383             :                                                 }
    1384  2901083823 :                                         } else if (*e == '\n')
    1385   140496195 :                                                 lineno++;
    1386             :                                         /* check for quoting and the row separator */
    1387  2901087005 :                                         if (bs) {
    1388             :                                                 bs = false;
    1389  2900549509 :                                         } else if (task->escape && *e == '\\') {
    1390             :                                                 bs = true;
    1391             :                                                 i = 0;
    1392  2900012013 :                                         } else if (*e == q) {
    1393             :                                                 q = 0;
    1394  2895061647 :                                         } else if (*e == quote) {
    1395             :                                                 q = quote;
    1396             :                                                 i = 0;
    1397  2890111217 :                                         } else if (q == 0) {
    1398  2750158096 :                                                 i = rdfa[i][(unsigned char) *e];
    1399  2750158096 :                                                 if (i == rseplen)
    1400             :                                                         break;
    1401             :                                         }
    1402             :                                 }
    1403  2760592162 :                                 e++;
    1404             :                         }
    1405   140497280 :                         if (*e == 0) {
    1406         932 :                                 partial = e - s;
    1407             :                                 /* found an incomplete record, saved for next round */
    1408         932 :                                 if (unlikely(s+partial < end)) {
    1409             :                                         /* found a EOS in the input */
    1410           0 :                                         tablet_error(task, rowno, startlineno, int_nil, "record too long (EOS found)", "");
    1411           0 :                                         ateof[cur] = true;
    1412           0 :                                         goto reportlackofinput;
    1413             :                                 }
    1414             :                                 break;
    1415             :                         } else {
    1416   140496348 :                                 rowno++;
    1417   140496348 :                                 if (task->skip > 0) {
    1418          10 :                                         task->skip--;
    1419             :                                 } else {
    1420             :                                         if (cnt < task->maxrow) {
    1421   140496338 :                                                 task->startlineno[cur][task->top[cur]] = startlineno;
    1422   140496338 :                                                 task->rows[cur][task->top[cur]++] = s;
    1423             :                                                 startlineno = lineno;
    1424   140496338 :                                                 cnt++;
    1425             :                                         }
    1426   140496338 :                                         *(e + 1 - rseplen) = 0;
    1427             :                                 }
    1428   140496348 :                                 s = ++e;
    1429   140496348 :                                 task->b->pos += (size_t) (e - base);
    1430             :                                 base = e;
    1431   140496348 :                                 if (task->top[cur] == task->limit)
    1432             :                                         break;
    1433             :                         }
    1434             :                 }
    1435             : 
    1436      131283 :           reportlackofinput:
    1437             : /*        TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
    1438             : 
    1439      132218 :                 if (consoleinput) {
    1440      130809 :                         task->cur = cur;
    1441      130809 :                         task->ateof = ateof[cur];
    1442      130809 :                         task->cnt = bufcnt[cur];
    1443             :                         /* tell consumer to go ahead */
    1444      130809 :                         MT_sema_up(&task->consumer);
    1445             :                         /* then wait until it is done */
    1446      130809 :                         MT_sema_down(&task->producer);
    1447      130809 :                         if (cnt == task->maxrow) {
    1448         533 :                                 GDKfree(rdfa);
    1449         533 :                                 return;
    1450             :                         }
    1451             :                 } else {
    1452        1409 :                         assert(!blocked[cur]);
    1453        1409 :                         if (blocked[(cur + 1) % MAXBUFFERS]) {
    1454             :                                 /* first wait until other buffer is done */
    1455             : /*                              TRC_DEBUG(MAL_SERVER, "Wait for consumers to finish buffer: %d\n", (cur + 1) % MAXBUFFERS);*/
    1456             : 
    1457        1141 :                                 MT_sema_down(&task->producer);
    1458        1141 :                                 blocked[(cur + 1) % MAXBUFFERS] = false;
    1459        1141 :                                 if (task->state == ENDOFCOPY) {
    1460           0 :                                         GDKfree(rdfa);
    1461           0 :                                         return;
    1462             :                                 }
    1463             :                         }
    1464             :                         /* other buffer is done, proceed with current buffer */
    1465        1409 :                         assert(!blocked[(cur + 1) % MAXBUFFERS]);
    1466        1409 :                         blocked[cur] = true;
    1467        1409 :                         task->cur = cur;
    1468        1409 :                         task->ateof = ateof[cur];
    1469        1409 :                         task->cnt = bufcnt[cur];
    1470        1409 :                         more = !ateof[cur] || (e && e < end && task->top[cur] == task->limit);
    1471             : /*                      TRC_DEBUG(MAL_SERVER, "SQL producer got buffer '%d' filled with '%d' records\n", cur, task->top[cur]);*/
    1472             : 
    1473        1409 :                         MT_sema_up(&task->consumer);
    1474             : 
    1475             :                         cur = (cur + 1) % MAXBUFFERS;
    1476             : /*                      TRC_DEBUG(MAL_SERVER, "May continue with buffer: %d\n", cur);*/
    1477             : 
    1478        1409 :                         if (cnt == task->maxrow) {
    1479         149 :                                 MT_sema_down(&task->producer);
    1480             : /*                              TRC_DEBUG(MAL_SERVER, "Producer delivered all\n");*/
    1481         149 :                                 GDKfree(rdfa);
    1482         149 :                                 return;
    1483             :                         }
    1484             :                 }
    1485             : /*              TRC_DEBUG(MAL_SERVER, "Continue producer buffer: %d\n", cur);*/
    1486             : 
    1487             :                 /* we ran out of input? */
    1488      131536 :                 if (task->ateof && !more) {
    1489             : /*                      TRC_DEBUG(MAL_SERVER, "Producer encountered eof\n");*/
    1490         123 :                         GDKfree(rdfa);
    1491         123 :                         return;
    1492             :                 }
    1493             :                 /* consumers ask us to stop? */
    1494      131413 :                 if (task->state == ENDOFCOPY) {
    1495           0 :                         GDKfree(rdfa);
    1496           0 :                         return;
    1497             :                 }
    1498      131413 :                 bufcnt[cur] = cnt;
    1499             :                 /* move the non-parsed correct row data to the head of the next buffer */
    1500      131413 :                 end = s = task->input[cur];
    1501             :         }
    1502           0 :         if (unlikely(cnt < task->maxrow && task->maxrow != BUN_NONE)) {
    1503             :                 char msg[256];
    1504           0 :                 snprintf(msg, sizeof(msg), "incomplete record at end of file:%s\n", s);
    1505           0 :                 task->as->error = GDKstrdup(msg);
    1506           0 :                 tablet_error(task, rowno, startlineno, int_nil, "incomplete record at end of file", s);
    1507           0 :                 task->b->pos += partial;
    1508             :         }
    1509           0 :         GDKfree(rdfa);
    1510           0 :         return;
    1511             : 
    1512           2 :   badutf8:
    1513           2 :         tablet_error(task, rowno, startlineno, int_nil, "input not properly encoded UTF-8", "");
    1514           2 :         ateof[cur] = true;
    1515           2 :         goto reportlackofinput;
    1516             : }
    1517             : 
    1518             : static void
    1519         833 : create_rejects_table(Client cntxt)
    1520             : {
    1521         833 :         MT_lock_set(&mal_contextLock);
    1522         833 :         if (cntxt->error_row == NULL) {
    1523         412 :                 cntxt->error_row = COLnew(0, TYPE_lng, 0, TRANSIENT);
    1524         412 :                 cntxt->error_fld = COLnew(0, TYPE_int, 0, TRANSIENT);
    1525         412 :                 cntxt->error_msg = COLnew(0, TYPE_str, 0, TRANSIENT);
    1526         412 :                 cntxt->error_input = COLnew(0, TYPE_str, 0, TRANSIENT);
    1527         412 :                 if (cntxt->error_row == NULL || cntxt->error_fld == NULL || cntxt->error_msg == NULL || cntxt->error_input == NULL) {
    1528           0 :                         if (cntxt->error_row)
    1529           0 :                                 BBPunfix(cntxt->error_row->batCacheid);
    1530           0 :                         if (cntxt->error_fld)
    1531           0 :                                 BBPunfix(cntxt->error_fld->batCacheid);
    1532           0 :                         if (cntxt->error_msg)
    1533           0 :                                 BBPunfix(cntxt->error_msg->batCacheid);
    1534           0 :                         if (cntxt->error_input)
    1535           0 :                                 BBPunfix(cntxt->error_input->batCacheid);
    1536           0 :                         cntxt->error_row = cntxt->error_fld = cntxt->error_msg = cntxt->error_input = NULL;
    1537             :                 }
    1538             :         }
    1539         833 :         MT_lock_unset(&mal_contextLock);
    1540         833 : }
    1541             : 
    1542             : BUN
    1543         805 : SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out, const char *csep, const char *rsep, char quote, lng skip, lng maxrow, int best, bool from_stdin, const char *tabnam, bool escape)
    1544             : {
    1545             :         BUN cnt = 0, cntstart = 0, leftover = 0;
    1546             :         int res = 0;            /* < 0: error, > 0: success, == 0: continue processing */
    1547             :         int j;
    1548             :         BUN firstcol;
    1549             :         BUN i, attr;
    1550             :         READERtask task;
    1551             :         READERtask ptask[MAXWORKERS];
    1552         805 :         int threads = (maxrow< 0 || maxrow > (1 << 16)) && GDKnr_threads > 1 ? (GDKnr_threads < MAXWORKERS ? GDKnr_threads - 1 : MAXWORKERS - 1) : 1;
    1553             :         lng lio = 0, tio, t1 = 0, total = 0, iototal = 0;
    1554             :         char name[MT_NAME_LEN];
    1555             : 
    1556             : /*      TRC_DEBUG(MAL_SERVER, "Prepare copy work for '%d' threads col '%s' rec '%s' quot '%c'\n", threads, csep, rsep, quote);*/
    1557             : 
    1558         805 :         memset(ptask, 0, sizeof(ptask));
    1559         805 :         task = (READERtask) {
    1560             :                 .cntxt = cntxt,
    1561             :                 .from_stdin = from_stdin,
    1562             :                 .as = as,
    1563             :                 .escape = escape,               /* TODO: implement feature!!! */
    1564             :         };
    1565             : 
    1566             :         /* create the reject tables */
    1567         805 :         create_rejects_table(task.cntxt);
    1568         805 :         if (task.cntxt->error_row == NULL || task.cntxt->error_fld == NULL || task.cntxt->error_msg == NULL || task.cntxt->error_input == NULL) {
    1569           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil, "SQLload initialization failed", "");
    1570           0 :                 goto bailout;
    1571             :         }
    1572             : 
    1573         805 :         assert(rsep);
    1574         805 :         assert(csep);
    1575         805 :         assert(maxrow < 0 || maxrow <= (lng) BUN_MAX);
    1576         805 :         task.fields = (char ***) GDKmalloc(as->nr_attrs * sizeof(char **));
    1577         805 :         task.cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
    1578         805 :         task.time = (lng *) GDKzalloc(as->nr_attrs * sizeof(lng));
    1579         805 :         if (task.fields == NULL || task.cols == NULL || task.time == NULL) {
    1580           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil, "memory allocation failed", "SQLload_file");
    1581           0 :                 goto bailout;
    1582             :         }
    1583         805 :         task.cur = 0;
    1584        2415 :         for (i = 0; i < MAXBUFFERS; i++) {
    1585        1610 :                 task.base[i] = GDKmalloc(MAXROWSIZE(2 * b->size) + 2);
    1586        1610 :                 task.rowlimit[i] = MAXROWSIZE(2 * b->size);
    1587        1610 :                 if (task.base[i] == 0) {
    1588           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil, SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
    1589           0 :                         goto bailout;
    1590             :                 }
    1591        1610 :                 task.base[i][0] = task.base[i][b->size + 1] = 0;
    1592        1610 :                 task.input[i] = task.base[i] + 1;       /* wrap the buffer with null bytes */
    1593             :         }
    1594         805 :         task.besteffort = best;
    1595             : 
    1596         805 :         if (maxrow < 0)
    1597          42 :                 task.maxrow = BUN_MAX;
    1598             :         else
    1599         763 :                 task.maxrow = (BUN) maxrow;
    1600             : 
    1601         805 :         if (task.fields == 0 || task.cols == 0 || task.time == 0) {
    1602           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil, SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
    1603           0 :                 goto bailout;
    1604             :         }
    1605             : 
    1606         805 :         task.skip = skip;
    1607         805 :         task.quote = quote;
    1608         805 :         task.csep = csep;
    1609         805 :         task.seplen = strlen(csep);
    1610         805 :         task.rsep = rsep;
    1611         805 :         task.rseplen = strlen(rsep);
    1612         805 :         task.errbuf = cntxt->errbuf;
    1613             : 
    1614         805 :         MT_sema_init(&task.producer, 0, "task.producer");
    1615         805 :         MT_sema_init(&task.consumer, 0, "task.consumer");
    1616         805 :         task.ateof = false;
    1617         805 :         task.b = b;
    1618         805 :         task.out = out;
    1619             : 
    1620             : #ifdef MLOCK_TST
    1621             :         mlock(task.fields, as->nr_attrs * sizeof(char *));
    1622             :         mlock(task.cols, as->nr_attrs * sizeof(int));
    1623             :         mlock(task.time, as->nr_attrs * sizeof(lng));
    1624             :         for (i = 0; i < MAXBUFFERS; i++)
    1625             :                 mlock(task.base[i], b->size + 2);
    1626             : #endif
    1627         805 :         as->error = NULL;
    1628             : 
    1629             :         /* there is no point in creating more threads than we have columns */
    1630         805 :         if (as->nr_attrs < (BUN) threads)
    1631          20 :                 threads = (int) as->nr_attrs;
    1632             : 
    1633             :         /* allocate enough space for pointers into the buffer pool.  */
    1634             :         /* the record separator is considered a column */
    1635         805 :         task.limit = (int) (b->size / as->nr_attrs + as->nr_attrs);
    1636       10169 :         for (i = 0; i < as->nr_attrs; i++) {
    1637        9364 :                 task.fields[i] = GDKmalloc(sizeof(char *) * task.limit);
    1638        9364 :                 if (task.fields[i] == 0) {
    1639           0 :                         if (task.as->error == NULL)
    1640           0 :                                 as->error = createException(MAL, "sql.copy_from", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1641           0 :                         goto bailout;
    1642             :                 }
    1643             : #ifdef MLOCK_TST
    1644             :                 mlock(task.fields[i], sizeof(char *) * task.limit);
    1645             : #endif
    1646        9364 :                 task.cols[i] = (int) (i + 1);   /* to distinguish non initialized later with zero */
    1647             :         }
    1648        2415 :         for (i = 0; i < MAXBUFFERS; i++) {
    1649        1610 :                 task.rows[i] = GDKzalloc(sizeof(char *) * task.limit);
    1650        1610 :                 task.startlineno[i] = GDKzalloc(sizeof(lng) * task.limit);
    1651        1610 :                 if (task.rows[i] == NULL || task.startlineno[i] == NULL) {
    1652           0 :                         GDKfree(task.rows[i]);
    1653           0 :                         GDKfree(task.startlineno[i]);
    1654           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil, SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file:failed to alloc buffers");
    1655           0 :                         goto bailout;
    1656             :                 }
    1657             :         }
    1658         805 :         task.rowerror = (bte *) GDKzalloc(sizeof(bte) * task.limit);
    1659         805 :         if( task.rowerror == NULL){
    1660           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil, SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file:failed to alloc rowerror buffer");
    1661           0 :                 goto bailout;
    1662             :         }
    1663             : 
    1664         805 :         task.id = 0;
    1665         805 :         snprintf(name, sizeof(name), "prod-%s", tabnam);
    1666         805 :         if ((task.tid = THRcreate(SQLproducer, (void *) &task, MT_THR_JOINABLE, name)) == 0) {
    1667           0 :                 tablet_error(&task, lng_nil, lng_nil, int_nil, SQLSTATE(42000) "failed to start producer thread", "SQLload_file");
    1668           0 :                 goto bailout;
    1669             :         }
    1670             : /*      TRC_DEBUG(MAL_SERVER, "Parallel bulk load " LLFMT " - " BUNFMT "\n", skip, task.maxrow);*/
    1671             : 
    1672         805 :         task.workers = threads;
    1673        1730 :         for (j = 0; j < threads; j++) {
    1674         925 :                 ptask[j] = task;
    1675         925 :                 ptask[j].id = j;
    1676         925 :                 ptask[j].cols = (int *) GDKzalloc(as->nr_attrs * sizeof(int));
    1677         925 :                 if (ptask[j].cols == 0) {
    1678           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil, SQLSTATE(HY013) MAL_MALLOC_FAIL, "SQLload_file");
    1679           0 :                         task.id = -1;
    1680           0 :                         MT_sema_up(&task.producer);
    1681           0 :                         goto bailout;
    1682             :                 }
    1683             : #ifdef MLOCK_TST
    1684             :                 mlock(ptask[j].cols, sizeof(char *) * task.limit);
    1685             : #endif
    1686         925 :                 snprintf(name, sizeof(name), "ptask%d.sema", j);
    1687         925 :                 MT_sema_init(&ptask[j].sema, 0, name);
    1688         925 :                 snprintf(name, sizeof(name), "ptask%d.repl", j);
    1689         925 :                 MT_sema_init(&ptask[j].reply, 0, name);
    1690         925 :                 snprintf(name, sizeof(name), "wrkr%d-%s", j, tabnam);
    1691         925 :                 if ((ptask[j].tid = THRcreate(SQLworker, (void *) &ptask[j], MT_THR_JOINABLE, name)) == 0) {
    1692           0 :                         tablet_error(&task, lng_nil, lng_nil, int_nil, SQLSTATE(42000) "failed to start worker thread", "SQLload_file");
    1693             :                         threads = j;
    1694           0 :                         for (j = 0; j < threads; j++)
    1695           0 :                                 ptask[j].workers = threads;
    1696             :                 }
    1697             :         }
    1698         805 :         if (threads == 0) {
    1699             :                 /* no threads started */
    1700           0 :                 task.id = -1;
    1701           0 :                 MT_sema_up(&task.producer);
    1702           0 :                 goto bailout;
    1703             :         }
    1704         805 :         MT_sema_up(&task.producer);
    1705             : 
    1706         805 :         tio = GDKusec();
    1707         805 :         tio = GDKusec() - tio;
    1708         805 :         t1 = GDKusec();
    1709             : #ifdef MLOCK_TST
    1710             :         mlock(task.b->buf, task.b->size);
    1711             : #endif
    1712         808 :         for (firstcol = 0; firstcol < task.as->nr_attrs; firstcol++)
    1713         808 :                 if (task.as->format[firstcol].c != NULL)
    1714             :                         break;
    1715      132900 :         while (res == 0 && cnt < task.maxrow) {
    1716             : 
    1717             :                 // track how many elements are in the aggregated BATs
    1718      132218 :                 cntstart = BATcount(task.as->format[firstcol].c);
    1719             :                 /* block until the producer has data available */
    1720      132218 :                 MT_sema_down(&task.consumer);
    1721      132218 :                 cnt += task.top[task.cur];
    1722      132218 :                 if (task.ateof && !task.top[task.cur])
    1723             :                         break;
    1724      132095 :                 t1 = GDKusec() - t1;
    1725             :                 total += t1;
    1726             :                 iototal += tio;
    1727             : /*              TRC_DEBUG(MAL_SERVER, "Break: %d rows\n", task.top[task.cur]);*/
    1728             : 
    1729      132095 :                 t1 = GDKusec();
    1730      132095 :                 if (task.top[task.cur]) {
    1731             :                         /* activate the workers to break rows */
    1732      465530 :                         for (j = 0; j < threads; j++) {
    1733             :                                 /* stage one, break the rows in parallel */
    1734      333471 :                                 ptask[j].error = 0;
    1735      333471 :                                 ptask[j].state = BREAKROW;
    1736      333471 :                                 ptask[j].next = task.top[task.cur];
    1737      333471 :                                 ptask[j].fields = task.fields;
    1738      333471 :                                 ptask[j].limit = task.limit;
    1739      333471 :                                 ptask[j].cnt = task.cnt;
    1740      333471 :                                 ptask[j].cur = task.cur;
    1741      333471 :                                 ptask[j].top[task.cur] = task.top[task.cur];
    1742      333471 :                                 MT_sema_up(&ptask[j].sema);
    1743             :                         }
    1744             :                 }
    1745      132095 :                 if (task.top[task.cur]) {
    1746             :                         /* await completion of row break phase */
    1747      465530 :                         for (j = 0; j < threads; j++) {
    1748      333471 :                                 MT_sema_down(&ptask[j].reply);
    1749      333471 :                                 if (ptask[j].error) {
    1750             :                                         res = -1;
    1751             : /*                                      TRC_ERROR(MAL_SERVER, "Error in task: %d %d\n", j, ptask[j].error);*/
    1752             :                                 }
    1753             :                         }
    1754             :                 }
    1755             : 
    1756             : /*              TRC_DEBUG(MAL_SERVER,
    1757             :                         "Fill the BATs '%d' " BUNFMT " cap " BUNFMT "\n",
    1758             :                         task.top[task.cur], task.cnt, BATcapacity(as->format[task.cur].c));*/
    1759             : 
    1760      132095 :                 lio += GDKusec() - t1;  /* row break done */
    1761      132095 :                 if (task.top[task.cur]) {
    1762      132059 :                         if (res == 0) {
    1763      132059 :                                 SQLworkdivider(&task, ptask, (int) as->nr_attrs, threads);
    1764             : 
    1765             :                                 /* activate the workers to update the BATs */
    1766      465530 :                                 for (j = 0; j < threads; j++) {
    1767             :                                         /* stage two, update the BATs */
    1768      333471 :                                         ptask[j].state = UPDATEBAT;
    1769      333471 :                                         MT_sema_up(&ptask[j].sema);
    1770             :                                 }
    1771             :                         }
    1772             :                 }
    1773      132095 :                 tio = GDKusec();
    1774             :                 tio = t1 - tio;
    1775             : 
    1776             :                 /* await completion of the BAT updates */
    1777      132095 :                 if (res == 0 && task.top[task.cur]) {
    1778      465530 :                         for (j = 0; j < threads; j++) {
    1779      333471 :                                 MT_sema_down(&ptask[j].reply);
    1780      333471 :                                 if (ptask[j].errorcnt > 0 && !ptask[j].besteffort) {
    1781             :                                         res = -1;
    1782             :                                         best = 0;
    1783             :                                 }
    1784             :                         }
    1785             :                 }
    1786             : 
    1787             :                 /* trim the BATs discarding error tuples */
    1788             : #define trimerrors(TYPE)                                                                                                \
    1789             :                 do {                                                                                                                    \
    1790             :                         TYPE *src, *dst;                                                                                        \
    1791             :                         leftover= BATcount(task.as->format[attr].c);                         \
    1792             :                         limit = leftover - cntstart;                                                            \
    1793             :                         dst =src= (TYPE *) BUNtloc(task.as->format[attr].ci,cntstart); \
    1794             :                         for(j = 0; j < (int) limit; j++, src++){                                     \
    1795             :                                 if ( task.rowerror[j]){                                                                 \
    1796             :                                         leftover--;                                                                                     \
    1797             :                                         continue;                                                                                       \
    1798             :                                 }                                                                                                               \
    1799             :                                 *dst++ = *src;                                                                                  \
    1800             :                         }                                                                                                                       \
    1801             :                         BATsetcount(task.as->format[attr].c, leftover );                     \
    1802             :                 } while (0)
    1803             : 
    1804             : /*              TRC_DEBUG(MAL_SERVER, "Trim bbest '%d' table size " BUNFMT " - rows found so far " BUNFMT "\n",
    1805             :                                          best, BATcount(as->format[firstcol].c), task.cnt); */
    1806             : 
    1807      132095 :                 if (best && BATcount(as->format[firstcol].c)) {
    1808             :                         BUN limit;
    1809             :                         int width;
    1810             : 
    1811          45 :                         for (attr = 0; attr < as->nr_attrs; attr++) {
    1812          31 :                                 if (as->format[attr].skip)
    1813           5 :                                         continue;
    1814          26 :                                 width = as->format[attr].c->twidth;
    1815          26 :                                 as->format[attr].ci = bat_iterator_nolock(as->format[attr].c);
    1816          26 :                                 switch (width){
    1817           5 :                                 case 1:
    1818          16 :                                         trimerrors(bte);
    1819           5 :                                         break;
    1820           0 :                                 case 2:
    1821           0 :                                         trimerrors(sht);
    1822           0 :                                         break;
    1823          21 :                                 case 4:
    1824          55 :                                         trimerrors(int);
    1825          21 :                                         break;
    1826           0 :                                 case 8:
    1827           0 :                                         trimerrors(lng);
    1828           0 :                                         break;
    1829             : #ifdef HAVE_HGE
    1830           0 :                                 case 16:
    1831           0 :                                         trimerrors(hge);
    1832           0 :                                         break;
    1833             : #endif
    1834           0 :                                 default:
    1835             :                                         {
    1836             :                                                 char *src, *dst;
    1837           0 :                                                 leftover= BATcount(task.as->format[attr].c);
    1838           0 :                                                 limit = leftover - cntstart;
    1839           0 :                                                 dst = src= BUNtloc(task.as->format[attr].ci,cntstart);
    1840           0 :                                                 for(j = 0; j < (int) limit; j++, src += width){
    1841           0 :                                                         if ( task.rowerror[j]){
    1842           0 :                                                                 leftover--;
    1843           0 :                                                                 continue;
    1844             :                                                         }
    1845           0 :                                                         if (dst != src)
    1846           0 :                                                                 memcpy(dst, src, width);
    1847           0 :                                                         dst += width;
    1848             :                                                 }
    1849           0 :                                                 BATsetcount(task.as->format[attr].c, leftover );
    1850             :                                         }
    1851           0 :                                         break;
    1852             :                                 }
    1853             :                         }
    1854             :                         // re-initialize the error vector;
    1855          14 :                         memset(task.rowerror, 0, task.limit);
    1856          14 :                         task.errorcnt = 0;
    1857             :                 }
    1858             : 
    1859      132095 :                 if (res < 0) {
    1860             :                         /* producer should stop */
    1861          22 :                         task.maxrow = cnt;
    1862          22 :                         task.state = ENDOFCOPY;
    1863             :                 }
    1864      132095 :                 if (task.ateof && task.top[task.cur] < task.limit && cnt != task.maxrow)
    1865             :                         break;
    1866      132095 :                 task.top[task.cur] = 0;
    1867      132095 :                 MT_sema_up(&task.producer);
    1868             :         }
    1869             : 
    1870             : /*      TRC_DEBUG(MAL_SERVER, "End of block stream eof=%d - res=%d\n", task.ateof, res);*/
    1871             : 
    1872         805 :         cnt = BATcount(task.as->format[firstcol].c);
    1873             : 
    1874         805 :         task.ateof = true;
    1875         805 :         task.state = ENDOFCOPY;
    1876             : /*      TRC_DEBUG(MAL_SERVER, "Activate sync on disk\n");*/
    1877             : 
    1878             :         // activate the workers to sync the BATs to disk
    1879         805 :         if (res == 0) {
    1880        1686 :                 for (j = 0; j < threads; j++) {
    1881             :                         // stage three, update the BATs
    1882         903 :                         ptask[j].state = SYNCBAT;
    1883         903 :                         MT_sema_up(&ptask[j].sema);
    1884             :                 }
    1885             :         }
    1886             : 
    1887         805 :         if (!task.ateof || cnt < task.maxrow) {
    1888             : /*              TRC_DEBUG(MAL_SERVER, "Shut down reader\n");*/
    1889         133 :                 MT_sema_up(&task.producer);
    1890             :         }
    1891         805 :         MT_join_thread(task.tid);
    1892         805 :         if (res == 0) {
    1893             :                 // await completion of the BAT syncs
    1894        1686 :                 for (j = 0; j < threads; j++)
    1895         903 :                         MT_sema_down(&ptask[j].reply);
    1896             :         }
    1897             : 
    1898             : /*      TRC_DEBUG(MAL_SERVER, "Activate endofcopy\n");*/
    1899             : 
    1900        1730 :         for (j = 0; j < threads; j++) {
    1901         925 :                 ptask[j].state = ENDOFCOPY;
    1902         925 :                 MT_sema_up(&ptask[j].sema);
    1903             :         }
    1904             :         /* wait for their death */
    1905        1730 :         for (j = 0; j < threads; j++)
    1906         925 :                 MT_sema_down(&ptask[j].reply);
    1907             : 
    1908             : /*      TRC_DEBUG(MAL_SERVER, "Kill the workers\n");*/
    1909             : 
    1910        1730 :         for (j = 0; j < threads; j++) {
    1911         925 :                 MT_join_thread(ptask[j].tid);
    1912         925 :                 GDKfree(ptask[j].cols);
    1913         925 :                 MT_sema_destroy(&ptask[j].sema);
    1914         925 :                 MT_sema_destroy(&ptask[j].reply);
    1915             :         }
    1916             : 
    1917             : /*      TRC_DEBUG(MAL_SERVER, "Found " BUNFMT " tuples\n", cnt);*/
    1918             : /*      TRC_DEBUG(MAL_SERVER, "Leftover input: %.63s\n", task.b->buf + task.b->pos);*/
    1919             : 
    1920       10169 :         for (i = 0; i < as->nr_attrs; i++) {
    1921        9364 :                 BAT *b = task.as->format[i].c;
    1922        9364 :                 if (b)
    1923        9358 :                         BATsettrivprop(b);
    1924        9364 :                 GDKfree(task.fields[i]);
    1925             :         }
    1926         805 :         GDKfree(task.fields);
    1927         805 :         GDKfree(task.cols);
    1928         805 :         GDKfree(task.time);
    1929        2415 :         for (i = 0; i < MAXBUFFERS; i++) {
    1930        1610 :                 GDKfree(task.base[i]);
    1931        1610 :                 GDKfree(task.rows[i]);
    1932        1610 :                 GDKfree(task.startlineno[i]);
    1933             :         }
    1934         805 :         if (task.rowerror)
    1935         805 :                 GDKfree(task.rowerror);
    1936         805 :         MT_sema_destroy(&task.producer);
    1937         805 :         MT_sema_destroy(&task.consumer);
    1938             : #ifdef MLOCK_TST
    1939             :         munlockall();
    1940             : #endif
    1941             : 
    1942         805 :         return res < 0 ? BUN_NONE : cnt;
    1943             : 
    1944           0 :   bailout:
    1945           0 :         if (task.fields) {
    1946           0 :                 for (i = 0; i < as->nr_attrs; i++) {
    1947           0 :                         if (task.fields[i])
    1948           0 :                                 GDKfree(task.fields[i]);
    1949             :                 }
    1950           0 :                 GDKfree(task.fields);
    1951             :         }
    1952           0 :         GDKfree(task.time);
    1953           0 :         GDKfree(task.cols);
    1954           0 :         GDKfree(task.base[task.cur]);
    1955           0 :         GDKfree(task.rowerror);
    1956           0 :         for (i = 0; i < MAXWORKERS; i++)
    1957           0 :                 GDKfree(ptask[i].cols);
    1958             : #ifdef MLOCK_TST
    1959             :         munlockall();
    1960             : #endif
    1961             :         return BUN_NONE;
    1962             : }
    1963             : 
    1964             : /* return the latest reject table, to be on the safe side we should
    1965             :  * actually create copies within a critical section. Ignored for now. */
    1966             : str
    1967          28 : COPYrejects(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1968             : {
    1969          28 :         bat *row = getArgReference_bat(stk, pci, 0);
    1970          28 :         bat *fld = getArgReference_bat(stk, pci, 1);
    1971          28 :         bat *msg = getArgReference_bat(stk, pci, 2);
    1972          28 :         bat *inp = getArgReference_bat(stk, pci, 3);
    1973             : 
    1974          28 :         create_rejects_table(cntxt);
    1975          28 :         if (cntxt->error_row == NULL)
    1976           0 :                 throw(MAL, "sql.rejects", "No reject table available");
    1977          28 :         BBPretain(*row = cntxt->error_row->batCacheid);
    1978          28 :         BBPretain(*fld = cntxt->error_fld->batCacheid);
    1979          28 :         BBPretain(*msg = cntxt->error_msg->batCacheid);
    1980          28 :         BBPretain(*inp = cntxt->error_input->batCacheid);
    1981             :         (void) mb;
    1982          28 :         return MAL_SUCCEED;
    1983             : }
    1984             : 
    1985             : str
    1986          13 : COPYrejects_clear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1987             : {
    1988          13 :         if (cntxt->error_row) {
    1989          13 :                 MT_lock_set(&errorlock);
    1990          13 :                 BATclear(cntxt->error_row, true);
    1991          13 :                 if(cntxt->error_fld) BATclear(cntxt->error_fld, true);
    1992          13 :                 if(cntxt->error_msg) BATclear(cntxt->error_msg, true);
    1993          13 :                 if(cntxt->error_input) BATclear(cntxt->error_input, true);
    1994          13 :                 MT_lock_unset(&errorlock);
    1995             :         }
    1996             :         (void) mb;
    1997             :         (void) stk;
    1998             :         (void) pci;
    1999          13 :         return MAL_SUCCEED;
    2000             : }

Generated by: LCOV version 1.14