LCOV - code coverage report
Current view: top level - sql/backends/monet5 - sql_bincopyfrom.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 263 313 84.0 %
Date: 2021-10-13 02:24:04 Functions: 22 24 91.7 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * Implementation of COPY BINARY INTO
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "mapi_prompt.h"
      15             : #include "sql.h"
      16             : #include "mal_backend.h"
      17             : #include "mal_interpreter.h"
      18             : #include "copybinary.h"
      19             : #include "copybinary_support.h"
      20             : 
      21             : #define bailout(...) do { \
      22             :                 msg = createException(MAL, "sql.importColumn", SQLSTATE(42000) __VA_ARGS__); \
      23             :                 goto end; \
      24             :         } while (0)
      25             : 
      26             : 
      27             : static str
      28         134 : BATattach_as_bytes(BAT *bat, stream *s, bool byteswap, BUN rows_estimate, str (*fixup)(void*,void*,bool), int *eof_seen)
      29             : {
      30             :         str msg = MAL_SUCCEED;
      31         134 :         int tt = BATttype(bat);
      32         134 :         const size_t asz = (size_t) ATOMsize(tt);
      33             :         const size_t chunk_size = 1<<20;
      34             : 
      35             :         bool eof = false;
      36         420 :         while (!eof) {
      37         286 :                 assert(chunk_size % asz == 0);
      38             :                 size_t n;
      39         286 :                 if (rows_estimate > 0) {
      40             :                         // Set n to estimate+1 so it will read once, get n - 1 and know it's at EOF.
      41             :                         // Otherwise, it will read n, get n, then enlarge the heap, read again,
      42             :                         // and only then know it's at eof.
      43          83 :                         n = rows_estimate + 1;
      44             :                         rows_estimate = 0;
      45             :                 } else {
      46         203 :                         n = chunk_size / asz;
      47             :                 }
      48             : 
      49             :                 // First make some room
      50         286 :                 BUN validCount = bat->batCount;
      51         286 :                 BUN newCount = validCount + n;
      52         286 :                 if (BATextend(bat, newCount) != GDK_SUCCEED)
      53           0 :                         bailout("BATattach_as_bytes: %s", GDK_EXCEPTION);
      54             : 
      55             :                 // Read into the newly allocated space
      56         286 :                 char *start = Tloc(bat, validCount);
      57             :                 char *cur = start;
      58         286 :                 char *end = Tloc(bat, newCount);
      59         765 :                 while (cur < end) {
      60         479 :                         ssize_t nread = mnstr_read(s, cur, 1, end - cur);
      61         479 :                         if (nread < 0)
      62           0 :                                 bailout("BATattach_as_bytes: %s", mnstr_peek_error(s));
      63         479 :                         if (nread == 0) {
      64             :                                 eof = true;
      65         134 :                                 size_t tail = (cur - start) % asz;
      66         134 :                                 if (tail != 0) {
      67           0 :                                         bailout("BATattach_as_bytes: final item incomplete: %d bytes instead of %d", (int) tail, (int) asz);
      68             :                                 }
      69             :                                 end = cur;
      70             :                         }
      71         479 :                         cur += (size_t) nread;
      72             :                 }
      73         286 :                 msg = fixup(start, end, byteswap);
      74         286 :                 if (msg != NULL)
      75           0 :                         goto end;
      76         286 :                 BUN actualCount = validCount + (end - start) / asz;
      77         286 :                 BATsetcount(bat, actualCount);
      78             :         }
      79             : 
      80         134 :         BATsetcount(bat, bat->batCount);
      81         133 :         bat->tseqbase = oid_nil;
      82         133 :         bat->tnonil = bat->batCount == 0;
      83         133 :         bat->tnil = false;
      84         133 :         if (bat->batCount <= 1) {
      85           0 :                 bat->tsorted = true;
      86           0 :                 bat->trevsorted = true;
      87           0 :                 bat->tkey = true;
      88             :         } else {
      89         133 :                 bat->tsorted = false;
      90         133 :                 bat->trevsorted = false;
      91         133 :                 bat->tkey = false;
      92             :         }
      93             : 
      94         133 : end:
      95         133 :         *eof_seen = (int)eof;
      96         133 :         return msg;
      97             : }
      98             : 
      99             : static str
     100          22 : convert_bte(void *start, void *end, bool byteswap)
     101             : {
     102             :         (void)start;
     103             :         (void)end;
     104             :         (void)byteswap;
     105             : 
     106          22 :         return MAL_SUCCEED;
     107             : }
     108             : 
     109             : static str
     110           0 : convert_bit(void *start, void *end, bool byteswap)
     111             : {
     112             :         (void)byteswap;
     113             :         unsigned char *e = end;
     114           0 :         for (unsigned char *p = start; p < e; p++) {
     115           0 :                 int b = *p;
     116           0 :                 if (b > 1)
     117           0 :                         throw(SQL, "convert_bit", SQLSTATE(22003) "invalid boolean byte value: %d", b);
     118             :         }
     119             :         return MAL_SUCCEED;
     120             : }
     121             : 
     122             : static str
     123          16 : convert_sht(void *start, void *end, bool byteswap)
     124             : {
     125          16 :         if (byteswap)
     126     2000002 :                 for (sht *p = start; p < (sht*)end; p++)
     127             :                         copy_binary_convert16(p);
     128             : 
     129          16 :         return MAL_SUCCEED;
     130             : }
     131             : 
     132             : static str
     133         136 : convert_int(void *start, void *end, bool byteswap)
     134             : {
     135         136 :         if (byteswap)
     136     2000002 :                 for (int *p = start; p < (int*)end; p++)
     137     2000000 :                         copy_binary_convert32(p);
     138             : 
     139         136 :         return MAL_SUCCEED;
     140             : }
     141             : 
     142             : static str
     143          30 : convert_lng(void *start, void *end, bool byteswap)
     144             : {
     145          30 :         if (byteswap)
     146     2000002 :                 for (lng *p = start; p < (lng*)end; p++)
     147             :                         copy_binary_convert64(p);
     148             : 
     149          30 :         return MAL_SUCCEED;
     150             : }
     151             : 
     152             : #ifdef HAVE_HGE
     153             : static str
     154          34 : convert_hge(void *start, void *end, bool byteswap)
     155             : {
     156          34 :         if (byteswap)
     157           0 :                 for (hge *p = start; p < (hge*)end; p++)
     158             :                         copy_binary_convert128(p);
     159             : 
     160          34 :         return MAL_SUCCEED;
     161             : }
     162             : #endif
     163             : 
     164             : static str
     165          32 : convert_uuid(void *start, void *end, bool byteswap)
     166             : {
     167             :         (void)byteswap;
     168          32 :         size_t nbytes = (char*)end - (char*)start;
     169          32 :         (void)nbytes; assert(nbytes % 16 == 0);
     170             : 
     171          32 :         return MAL_SUCCEED;
     172             : }
     173             : 
     174             : static str
     175           8 : convert_flt(void *start, void *end, bool byteswap)
     176             : {
     177             :         // Slightly dodgy pointer conversions here
     178             :         assert(sizeof(uint32_t) == sizeof(flt));
     179             :         assert(sizeof(struct { char dummy; uint32_t ui; }) >= sizeof(struct { char dummy; flt f; }));
     180             : 
     181           8 :         if (byteswap)
     182     2000002 :                 for (uint32_t *p = start; (void*)p < end; p++)
     183     2000000 :                         copy_binary_convert32(p);
     184             : 
     185           8 :         return MAL_SUCCEED;
     186             : }
     187             : 
     188             : static str
     189           8 : convert_dbl(void *start, void *end, bool byteswap)
     190             : {
     191             :         // Slightly dodgy pointer conversions here
     192             :         assert(sizeof(uint64_t) == sizeof(dbl));
     193             :         assert(sizeof(struct { char dummy; uint64_t ui; }) >= sizeof(struct { char dummy; dbl f; }));
     194             : 
     195             : 
     196           8 :         if (byteswap)
     197     2000002 :                 for (uint64_t *p = start; (void*)p < end; p++)
     198             :                         copy_binary_convert64(p);
     199             : 
     200           8 :         return MAL_SUCCEED;
     201             : }
     202             : 
     203             : static str
     204           6 : BATattach_fixed_width(BAT *bat, stream *s, bool byteswap, str (*convert)(void*,void*,void*,void*,bool), size_t record_size, int *eof_reached)
     205             : {
     206             :         str msg = MAL_SUCCEED;
     207             :         bstream *bs = NULL;
     208             : 
     209             :         size_t chunk_size = 1<<20;
     210           6 :         assert(record_size > 0);
     211           6 :         chunk_size -= chunk_size % record_size;
     212             : 
     213           6 :         bs = bstream_create(s, chunk_size);
     214           6 :         if (bs == NULL) {
     215           0 :                 msg = createException(SQL, "sql", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     216           0 :                 goto end;
     217             :         }
     218             : 
     219          59 :         while (1) {
     220          65 :                 ssize_t nread = bstream_next(bs);
     221          65 :                 if (nread < 0)
     222           0 :                         bailout("%s", mnstr_peek_error(s));
     223          65 :                 if (nread == 0)
     224             :                         break;
     225             : 
     226          59 :                 size_t n = (bs->len - bs->pos) / record_size;
     227          59 :                 size_t extent = n * record_size;
     228          59 :                 BUN count = BATcount(bat);
     229          59 :                 BUN newCount = count + n;
     230          59 :                 if (BATextend(bat, newCount) != GDK_SUCCEED)
     231           0 :                         bailout("%s", GDK_EXCEPTION);
     232             : 
     233          59 :                 msg = convert(
     234          59 :                         Tloc(bat, count), Tloc(bat, newCount),
     235          59 :                         &bs->buf[bs->pos], &bs->buf[bs->pos + extent],
     236             :                         byteswap);
     237          59 :                 if (msg != MAL_SUCCEED)
     238           0 :                         goto end;
     239          59 :                 BATsetcount(bat, newCount);
     240          59 :                 bs->pos += extent;
     241             :         }
     242             : 
     243           6 :         bat->tseqbase = oid_nil;
     244           6 :         bat->tnonil = bat->batCount == 0;
     245           6 :         bat->tnil = false;
     246           6 :         if (bat->batCount <= 1) {
     247           0 :                 bat->tsorted = true;
     248           0 :                 bat->trevsorted = true;
     249           0 :                 bat->tkey = true;
     250             :         } else {
     251           6 :                 bat->tsorted = false;
     252           6 :                 bat->trevsorted = false;
     253           6 :                 bat->tkey = false;
     254             :         }
     255             : 
     256           6 : end:
     257           6 :         *eof_reached = 0;
     258           6 :         if (bs != NULL) {
     259           6 :                 *eof_reached = (int)bs->eof;
     260           6 :                 bs->s = NULL;
     261           6 :                 bstream_destroy(bs);
     262             :         }
     263           6 :         return msg;
     264             : }
     265             : 
     266             : 
     267             : static str
     268           8 : convert_date(void *dst_start, void *dst_end, void *src_start, void *src_end, bool byteswap)
     269             : {
     270             :         date *dst = (date*)dst_start;
     271             :         date *dst_e = (date*)dst_end;
     272             :         copy_binary_date *src = (copy_binary_date*)src_start;
     273             :         copy_binary_date *src_e = (copy_binary_date*)src_end;
     274           8 :         (void)dst_e; assert(dst_e - dst == src_e - src);
     275             : 
     276     2000008 :         for (; src < src_e; src++) {
     277     2000000 :                 if (byteswap)
     278             :                         copy_binary_convert_date(src);
     279     2000000 :                 date value = date_create(src->year, src->month, src->day);
     280     2000000 :                 *dst++ = value;
     281             :         }
     282             : 
     283           8 :         return MAL_SUCCEED;
     284             : }
     285             : 
     286             : static str
     287          16 : convert_time(void *dst_start, void *dst_end, void *src_start, void *src_end, bool byteswap)
     288             : {
     289             :         (void)byteswap;
     290             :         daytime *dst = (daytime*)dst_start;
     291             :         daytime *dst_e = (daytime*)dst_end;
     292             :         copy_binary_time *src = (copy_binary_time*)src_start;
     293             :         copy_binary_time *src_e = (copy_binary_time*)src_end;
     294          16 :         (void)dst_e; assert(dst_e - dst == src_e - src);
     295             : 
     296     2000016 :         for (; src < src_e; src++) {
     297     2000000 :                 if (byteswap)
     298             :                         copy_binary_convert_time(src);
     299     2000000 :                 daytime value = daytime_create(src->hours, src->minutes, src->seconds, src->ms);
     300     2000000 :                 *dst++ = value;
     301             :         }
     302             : 
     303          16 :         return MAL_SUCCEED;
     304             : }
     305             : 
     306             : static str
     307          35 : convert_timestamp(void *dst_start, void *dst_end, void *src_start, void *src_end, bool byteswap)
     308             : {
     309             :         (void)byteswap;
     310             :         timestamp *dst = (timestamp*)dst_start;
     311             :         timestamp *dst_e = (timestamp*)dst_end;
     312             :         copy_binary_timestamp *src = (copy_binary_timestamp*)src_start;
     313             :         copy_binary_timestamp *src_e = (copy_binary_timestamp*)src_end;
     314          35 :         (void)dst_e; assert(dst_e - dst == src_e - src);
     315             : 
     316     2000035 :         for (; src < src_e; src++) {
     317     2000000 :                 if (byteswap)
     318             :                         copy_binary_convert_timestamp(src);
     319     2000000 :                 date dt = date_create(src->date.year, src->date.month, src->date.day);
     320     2000000 :                 daytime tm = daytime_create(src->time.hours, src->time.minutes, src->time.seconds, src->time.ms);
     321     2000000 :                 timestamp value = timestamp_create(dt, tm);
     322     2000000 :                 *dst++ = value;
     323             :         }
     324             : 
     325          35 :         return MAL_SUCCEED;
     326             : }
     327             : 
     328             : 
     329             : static str
     330    14246914 : convert_and_validate(char *text)
     331             : {
     332             :         unsigned char *r = (unsigned char*)text;
     333             :         unsigned char *w = r;
     334             : 
     335    14246914 :         if (*r == 0x80 && *(r+1) == 0) {
     336             :                 // Technically a utf-8 violation, but we treat it as the NULL marker
     337             :                 // GDK does so as well so we can just pass it on.
     338             :                 // load_zero_terminated_text() below contains an assert to ensure
     339             :                 // this remains the case.
     340             :                 return MAL_SUCCEED;
     341             :         }
     342             : 
     343   292921734 :         while (*r != 0) {
     344   279694822 :                 unsigned char c = *w++ = *r++;
     345             : 
     346   279694822 :                 if (c == '\r' && *r == '\n') {
     347             :                         w--;
     348     2000000 :                         continue;
     349             :                 }
     350   277694822 :                 if ((c & 0x80) == 0x00) // 1xxx_xxxx: standalone byte
     351   277447908 :                         continue;
     352      246914 :                 if ((c & 0xF8) == 0xF0) // 1111_0xxx
     353           2 :                         goto expect3;
     354      246912 :                 if ((c & 0xF0) == 0xE0) // 1110_xxxx
     355           0 :                         goto expect2;
     356      246912 :                 if ((c & 0xE0) == 0xC0) // 110x_xxxx
     357      246912 :                         goto expect1;
     358           0 :                 goto bad_utf8;
     359             : 
     360             : expect3:
     361           2 :                 if (((*w++ = *r++) & 0x80) != 0x80)
     362           2 :                         goto bad_utf8;
     363           0 : expect2:
     364           0 :                 if (((*w++ = *r++) & 0x80) != 0x80)
     365           0 :                         goto bad_utf8;
     366           0 : expect1:
     367      246912 :                 if (((*w++ = *r++) & 0x80) != 0x80)
     368           0 :                         goto bad_utf8;
     369             : 
     370             :         }
     371    13226912 :         *w = '\0';
     372    13226912 :         return MAL_SUCCEED;
     373             : 
     374           2 : bad_utf8:
     375           2 :         return createException(SQL, "BATattach_stream", SQLSTATE(42000) "malformed utf-8 byte sequence");
     376             : }
     377             : 
     378             : static str
     379    14246914 : append_text(BAT *bat, char *start)
     380             : {
     381    14246914 :         str msg = convert_and_validate(start);
     382    14246914 :         if (msg != MAL_SUCCEED)
     383             :                 return msg;
     384             : 
     385    14246912 :         if (BUNappend(bat, start, false) != GDK_SUCCEED)
     386           0 :                 return createException(SQL, "sql.importColumn", GDK_EXCEPTION);
     387             : 
     388             :         return MAL_SUCCEED;
     389             : }
     390             : 
     391             : // Load items from the stream and put them in the BAT.
     392             : // Because it's text read from a binary stream, we replace \r\n with \n.
     393             : // We don't have to validate the utf-8 structure because BUNappend does that for us.
     394             : static str
     395          16 : load_zero_terminated_text(BAT *bat, stream *s, int *eof_reached)
     396             : {
     397             :         str msg = MAL_SUCCEED;
     398             :         bstream *bs = NULL;
     399             : 
     400             :         // convert_and_validate() above counts on the following property to hold:
     401             :         assert(strNil((const char[2]){ 0x80, 0 }));
     402             : 
     403          16 :         bs = bstream_create(s, 1 << 20);
     404          16 :         if (bs == NULL) {
     405           0 :                 msg = createException(SQL, "sql", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     406           0 :                 goto end;
     407             :         }
     408             : 
     409             :         // In the outer loop we refill the buffer until the stream ends.
     410             :         // In the inner loop we look for complete \0-terminated strings.
     411         415 :         while (1) {
     412         431 :                 ssize_t nread = bstream_next(bs);
     413         431 :                 if (nread < 0)
     414           0 :                         bailout("%s", mnstr_peek_error(s));
     415         431 :                 if (nread == 0)
     416             :                         break;
     417             : 
     418         417 :                 char *buf_start = &bs->buf[bs->pos];
     419         417 :                 char *buf_end = &bs->buf[bs->len];
     420             :                 char *start, *end;
     421    14247329 :                 for (start = buf_start; (end = memchr(start, '\0', buf_end - start)) != NULL; start = end + 1) {
     422    14246914 :                         msg = append_text(bat, start);
     423    14246914 :                         if (msg != NULL)
     424           2 :                                 goto end;
     425             :                 }
     426         415 :                 bs->pos = start - buf_start;
     427             :         }
     428             : 
     429             :         // It's an error to have date left after falling out of the outer loop
     430          14 :         if (bs->pos < bs->len)
     431           0 :                 bailout("unterminated string at end");
     432             : 
     433          14 : end:
     434          16 :         *eof_reached = 0;
     435          16 :         if (bs != NULL) {
     436          16 :                 *eof_reached = (int)bs->eof;
     437          16 :                 bs->s = NULL;
     438          16 :                 bstream_destroy(bs);
     439             :         }
     440          16 :         return msg;
     441             : }
     442             : 
     443             : 
     444             : // Dispatcher table for imports. We dispatch on a string value instead of for
     445             : // example the underlying gdktype so we have freedom to some day implement for
     446             : // example both zero-terminated strings and newline-terminated strings.
     447             : //
     448             : // An entry must fill one field of the following three: 'loader',
     449             : // 'convert_fixed_width', or 'convert_in_place'.
     450             : 
     451             : // A 'loader' has complete freedom. It is handed a BAT and a stream and it can
     452             : // then do whatever it wants. We use it to read strings and json and other
     453             : // variable-width data.
     454             : //
     455             : // If an entry has has 'convert_in_place' this means the external and internal
     456             : // forms have the same size and are probably identical. In this case, the data
     457             : // is loaded directly into the bat heap and then the 'convert_in_place' function
     458             : // is called once for the whole block to perform any necessary tweaking of the data.
     459             : // We use this for example for the integer types, on little-endian platforms no
     460             : // tweaking is necessary and on big-endian platforms we byteswap the data.
     461             : //
     462             : // Finally, if an entry has 'convert_fixed_width' it means the internal and
     463             : // external forms are both fixed width but different in size. The data is loaded into
     464             : // intermediate buffers first and the conversion function copies the data from
     465             : // an array of incoming data in the buffer to an array of internal
     466             : // representations in the BAT.
     467             : //
     468             : // A note about the function signatures: we use start/end pointers instead of
     469             : // start/size pairs because this way there can be no confusion about whether
     470             : // the given size is a byte count or an item count.
     471             : static struct type_rec {
     472             :         char *method;
     473             :         char *gdk_type;
     474             :         str (*loader)(BAT *bat, stream *s, int *eof_reached);
     475             :         str (*convert_fixed_width)(void *dst_start, void *dst_end, void *src_start, void *src_end, bool byteswap);
     476             :         size_t record_size;
     477             :         str (*convert_in_place)(void *start, void *end, bool byteswap);
     478             : } type_recs[] = {
     479             :         { "bit", "bit", .convert_in_place=convert_bit, },
     480             :         { "bte", "bte", .convert_in_place=convert_bte, },
     481             :         { "sht", "sht", .convert_in_place=convert_sht, },
     482             :         { "int", "int", .convert_in_place=convert_int, },
     483             :         { "lng", "lng", .convert_in_place=convert_lng, },
     484             :         { "flt", "flt", .convert_in_place=convert_flt, },
     485             :         { "dbl", "dbl", .convert_in_place=convert_dbl, },
     486             :         //
     487             : #ifdef HAVE_HGE
     488             :         { "hge", "hge", .convert_in_place=convert_hge, },
     489             : #endif
     490             :         //
     491             :         { "str", "str", .loader=load_zero_terminated_text },
     492             :         { "url", "url", .loader=load_zero_terminated_text },
     493             :         { "json", "json", .loader=load_zero_terminated_text },
     494             :         { "uuid", "uuid", .convert_in_place=convert_uuid, },
     495             :         //
     496             :         { "date", "date", .convert_fixed_width=convert_date, .record_size=sizeof(copy_binary_date), },
     497             :         { "daytime", "daytime", .convert_fixed_width=convert_time, .record_size=sizeof(copy_binary_time), },
     498             :         { "timestamp", "timestamp", .convert_fixed_width=convert_timestamp, .record_size=sizeof(copy_binary_timestamp), },
     499             : };
     500             : 
     501             : 
     502             : static struct type_rec*
     503         156 : find_type_rec(str name)
     504             : {
     505             :         struct type_rec *end = (struct type_rec*)((char *)type_recs + sizeof(type_recs));
     506         798 :         for (struct type_rec *t = &type_recs[0]; t < end; t++)
     507         798 :                 if (strcmp(t->method, name) == 0)
     508         156 :                         return t;
     509             :         return NULL;
     510             : }
     511             : 
     512             : 
     513             : static str
     514         156 : load_column(struct type_rec *rec, const char *name, BAT *bat, stream *s, bool byteswap, BUN rows_estimate, int *eof_reached)
     515             : {
     516             :         str msg = MAL_SUCCEED;
     517             :         BUN orig_count, new_count;
     518             :         BUN rows_added;
     519             : 
     520         156 :         orig_count = BATcount(bat);
     521             : 
     522         156 :         if (rec->loader != NULL) {
     523          16 :                 msg = rec->loader(bat, s, eof_reached);
     524         140 :         } else if (rec->convert_in_place != NULL) {
     525         134 :                 msg = BATattach_as_bytes(bat, s, byteswap, rows_estimate, rec->convert_in_place, eof_reached);
     526           6 :         } else if (rec->convert_fixed_width != NULL) {
     527           6 :                 msg = BATattach_fixed_width(bat, s, byteswap, rec->convert_fixed_width, rec->record_size, eof_reached);
     528             :         } else {
     529           0 :                 *eof_reached = 0;
     530           0 :                 bailout("invalid loader configuration for '%s'", rec->method);
     531             :         }
     532             : 
     533         155 :         new_count = BATcount(bat);
     534         155 :         rows_added = new_count - orig_count;
     535             : 
     536         155 :         if (msg == MAL_SUCCEED && rows_estimate != 0 && rows_estimate != rows_added)
     537           3 :                 bailout(
     538             :                         "inconsistent row count in %s: expected "BUNFMT", got "BUNFMT,
     539             :                         name,
     540             :                         rows_estimate, rows_added);
     541             : 
     542         152 :         end:
     543         155 :                 return msg;
     544             : }
     545             : 
     546             : 
     547             : static str
     548          50 : start_mapi_file_upload(backend *be, str path, stream **s)
     549             : {
     550             :         str msg = MAL_SUCCEED;
     551          50 :         *s = NULL;
     552             : 
     553          50 :         stream *ws = be->mvc->scanner.ws;
     554          50 :         bstream *bs = be->mvc->scanner.rs;
     555          50 :         stream *rs = bs->s;
     556          50 :         assert(isa_block_stream(ws));
     557          50 :         assert(isa_block_stream(rs));
     558             : 
     559          50 :         mnstr_write(ws, PROMPT3, sizeof(PROMPT3)-1, 1);
     560          50 :         mnstr_printf(ws, "rb %s\n", path);
     561          50 :         mnstr_flush(ws, MNSTR_FLUSH_DATA);
     562          68 :         while (!bs->eof)
     563          18 :                 bstream_next(bs);
     564             :         char buf[80];
     565          50 :         if (mnstr_readline(rs, buf, sizeof(buf)) > 1) {
     566           0 :                 msg = createException(IO, "sql.importColumn", "Error %s", buf);
     567           0 :                 goto end;
     568             :         }
     569          50 :         set_prompting(rs, PROMPT2, ws);
     570             : 
     571          50 :         *s = rs;
     572          50 : end:
     573          50 :         return msg;
     574             : }
     575             : 
     576             : 
     577             : static str
     578          50 : finish_mapi_file_upload(backend *be, bool eof_reached)
     579             : {
     580             :         str msg = MAL_SUCCEED;
     581          50 :         stream *ws = be->mvc->scanner.ws;
     582          50 :         bstream *bs = be->mvc->scanner.rs;
     583          50 :         stream *rs = bs->s;
     584          50 :         assert(isa_block_stream(ws));
     585          50 :         assert(isa_block_stream(rs));
     586             : 
     587          50 :         set_prompting(rs, NULL, NULL);
     588          50 :         if (!eof_reached) {
     589             :                 // Probably due to an error. Read until message boundary.
     590             :                 char buf[8190];
     591           0 :                 while (1) {
     592           1 :                         ssize_t nread = mnstr_read(rs, buf, 1, sizeof(buf));
     593           1 :                         if (nread > 0)
     594           0 :                                 continue;
     595           1 :                         if (nread < 0)
     596           0 :                                 msg = createException(
     597             :                                         IO, "sql.importColumn",
     598             :                                         "while syncing read stream: %s", mnstr_peek_error(rs));
     599             :                         break;
     600             :                 }
     601             :         }
     602          50 :         mnstr_write(ws, PROMPT3, sizeof(PROMPT3)-1, 1);
     603          50 :         mnstr_flush(ws, MNSTR_FLUSH_DATA);
     604             : 
     605          50 :         return msg;
     606             : }
     607             : 
     608             : 
     609             : 
     610             : /* Import a single file into a new BAT.
     611             :  */
     612             : static str
     613         155 : importColumn(backend *be, bat *ret, BUN *retcnt, str method, bool byteswap, str path, int onclient,  BUN nrows)
     614             : {
     615             :         // In this function we create the BAT and open the file, and tidy
     616             :         // up when things go wrong. The actual work happens in load_column().
     617             : 
     618             :         // These are managed by the end: block.
     619             :         str msg = MAL_SUCCEED;
     620             :         int gdk_type;
     621             :         BAT *bat = NULL;
     622             :         stream *stream_to_close = NULL;
     623             :         bool do_finish_mapi = false;
     624         155 :         int eof_reached = -1; // 1 = read to the end; 0 = stopped reading early; -1 = unset, a bug.
     625             : 
     626             :         // This one is not managed by the end: block
     627             :         stream *s;
     628             : 
     629             :         // Set safe values
     630         155 :         *ret = 0;
     631         155 :         *retcnt = 0;
     632             : 
     633             :         // Figure out what kind of data we have
     634         155 :         struct type_rec *rec = find_type_rec(method);
     635         155 :         if (rec == NULL)
     636           0 :                 bailout("COPY BINARY FROM not implemented for '%s'", method);
     637             : 
     638             :         // Create the BAT
     639         155 :         gdk_type = ATOMindex(rec->gdk_type);
     640         156 :         if (gdk_type < 0)
     641           0 :                 bailout("cannot load %s as %s: unknown atom type %s", path, method, rec->gdk_type);
     642         156 :         bat = COLnew(0, gdk_type, nrows, PERSISTENT);
     643         156 :         if (bat == NULL)
     644           0 :                 bailout("%s", GDK_EXCEPTION);
     645             : 
     646             :         // Open the input stream
     647         156 :         if (onclient) {
     648          50 :                 s = NULL;
     649             :                 do_finish_mapi = true;
     650          50 :                 msg = start_mapi_file_upload(be, path, &s);
     651          50 :                 if (msg != MAL_SUCCEED)
     652           0 :                         goto end;
     653             :         } else {
     654         106 :                 s = stream_to_close = open_rstream(path);
     655         106 :                 if (s == NULL)
     656           0 :                         bailout("Couldn't open '%s' on server: %s", path, mnstr_peek_error(NULL));
     657             :         }
     658             : 
     659             :         // Do the work
     660         156 :         msg = load_column(rec, path, bat, s, byteswap, nrows, &eof_reached);
     661         155 :         if (eof_reached != 0 && eof_reached != 1) {
     662           0 :                 if (msg)
     663           0 :                         bailout("internal error in sql.importColumn: eof_reached not set (%s). Earlier error: %s", method, msg);
     664             :                 else
     665           0 :                         bailout("internal error in sql.importColumn: eof_reached not set (%s)", method);
     666             :         }
     667             : 
     668             :         // Fall through into the end block which will clean things up
     669         155 : end:
     670         155 :         if (do_finish_mapi) {
     671          50 :                 str msg1 = finish_mapi_file_upload(be, eof_reached == 1);
     672          50 :                 if (msg == MAL_SUCCEED)
     673             :                         msg = msg1;
     674             :         }
     675             : 
     676         155 :         if (stream_to_close)
     677         106 :                 close_stream(stream_to_close);
     678             : 
     679             :         // Manage the return values and `bat`.
     680         156 :         if (msg == MAL_SUCCEED) {
     681         151 :                 BBPkeepref(bat->batCacheid);
     682         151 :                 *ret = bat->batCacheid;
     683         151 :                 *retcnt = BATcount(bat);
     684             :         } else {
     685           5 :                 if (bat != NULL) {
     686           5 :                         BBPunfix(bat->batCacheid);
     687             :                         bat = NULL;
     688             :                 }
     689           5 :                 *ret = 0;
     690           5 :                 *retcnt = 0;
     691             :         }
     692             : 
     693         156 :         return msg;
     694             : }
     695             : 
     696             : 
     697             : str
     698         156 : mvc_bin_import_column_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     699             : {
     700             :         // Entry point for sql.importColumn.
     701             :         // Does the argument/return handling, the work is done by importColumn.
     702             :         (void)mb;
     703             : 
     704         156 :         assert(pci->retc == 2);
     705         156 :         bat *ret = getArgReference_bat(stk, pci, 0);
     706         156 :         BUN *retcnt = getArgReference_oid(stk, pci, 1);
     707             : 
     708         156 :         assert(pci->argc == 7);
     709         156 :         str method = *getArgReference_str(stk, pci, 2);
     710         156 :         bit byteswap = *getArgReference_bit(stk, pci, 3);
     711         156 :         str path = *getArgReference_str(stk, pci, 4);
     712         156 :         int onclient = *getArgReference_int(stk, pci, 5);
     713         156 :         BUN nrows = *getArgReference_oid(stk, pci, 6);
     714             : 
     715         156 :         backend *be = cntxt->sqlcontext;
     716             : 
     717         156 :         return importColumn(be, ret, retcnt, method, byteswap, path, onclient, nrows);
     718             : }
     719             : 
     720             : str
     721           0 : mvc_bin_import_table_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     722             : {
     723             :         // At some point we should remove all traces of importTable.
     724             :         // Until then, an error message.
     725             :         (void)cntxt;
     726             :         (void)mb;
     727             :         (void)stk;
     728             :         (void)pci;
     729             : 
     730           0 :         return createException(MAL, "mvc_bin_import_table_wrap", "MAL operator sql.importTable should have been replaced with sql.importColumn");
     731             : }

Generated by: LCOV version 1.14