LCOV - code coverage report
Current view: top level - common/stream - bs.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 124 150 82.7 %
Date: 2021-10-13 02:24:04 Functions: 9 11 81.8 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "stream.h"
      11             : #include "stream_internal.h"
      12             : 
      13             : /* ------------------------------------------------------------------ */
      14             : 
      15             : /* A buffered stream consists of a sequence of blocks.  Each block
      16             :  * consists of a count followed by the data in the block.  A flush is
      17             :  * indicated by an empty block (i.e. just a count of 0).
      18             :  */
      19             : 
      20             : static ssize_t bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt);
      21             : 
      22             : static bs *
      23       13740 : bs_create(void)
      24             : {
      25             :         /* should be a binary stream */
      26             :         bs *ns;
      27             : 
      28       13740 :         if ((ns = malloc(sizeof(*ns))) == NULL)
      29             :                 return NULL;
      30       13740 :         *ns = (bs) {0};
      31       13740 :         return ns;
      32             : }
      33             : 
      34             : /* Collect data until the internal buffer is filled, then write the
      35             :  * filled buffer to the underlying stream.
      36             :  * Struct field usage:
      37             :  * s - the underlying stream;
      38             :  * buf - the buffer in which data is collected;
      39             :  * nr - how much of buf is already filled (if nr == sizeof(buf) the
      40             :  *      data is written to the underlying stream, so upon entry nr <
      41             :  *      sizeof(buf));
      42             :  * itotal - unused.
      43             :  */
      44             : ssize_t
      45     6768821 : bs_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
      46             : {
      47             :         bs *s;
      48     6768821 :         size_t todo = cnt * elmsize;
      49             :         uint16_t blksize;
      50             : 
      51     6768821 :         s = (bs *) ss->stream_data.p;
      52     6768821 :         if (s == NULL)
      53             :                 return -1;
      54     6768821 :         assert(!ss->readonly);
      55     6768821 :         assert(s->nr < sizeof(s->buf));
      56    13605992 :         while (todo > 0) {
      57     6837171 :                 size_t n = sizeof(s->buf) - s->nr;
      58             : 
      59             :                 if (todo < n)
      60             :                         n = todo;
      61     6837171 :                 memcpy(s->buf + s->nr, buf, n);
      62     6837171 :                 s->nr += (unsigned) n;
      63     6837171 :                 todo -= n;
      64     6837171 :                 buf = ((const char *) buf + n);
      65     6837171 :                 if (s->nr == sizeof(s->buf)) {
      66             :                         /* block is full, write it to the stream */
      67             : #ifdef BSTREAM_DEBUG
      68             :                         {
      69             :                                 unsigned i;
      70             : 
      71             :                                 fprintf(stderr, "W %s %u \"", ss->name, s->nr);
      72             :                                 for (i = 0; i < s->nr; i++)
      73             :                                         if (' ' <= s->buf[i] && s->buf[i] < 127)
      74             :                                                 putc(s->buf[i], stderr);
      75             :                                         else
      76             :                                                 fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
      77             :                                 fprintf(stderr, "\"\n");
      78             :                         }
      79             : #endif
      80             :                         /* since the block is at max BLOCK (8K) - 2 size we can
      81             :                          * store it in a two byte integer */
      82             :                         blksize = (uint16_t) s->nr;
      83       69721 :                         s->bytes += s->nr;
      84             :                         /* the last bit tells whether a flush is in
      85             :                          * there, it's not at this moment, so shift it
      86             :                          * to the left */
      87             :                         blksize <<= 1;
      88       69721 :                         if (!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
      89       69721 :                             ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr) {
      90           0 :                                 mnstr_copy_error(ss, ss->inner);
      91           0 :                                 s->nr = 0; /* data is lost due to error */
      92           0 :                                 return -1;
      93             :                         }
      94       69721 :                         s->blks++;
      95       69721 :                         s->nr = 0;
      96             :                 }
      97             :         }
      98     6768821 :         return (ssize_t) cnt;
      99             : }
     100             : 
     101             : /* If the internal buffer is partially filled, write it to the
     102             :  * underlying stream.  Then in any case write an empty buffer to the
     103             :  * underlying stream to indicate to the receiver that the data was
     104             :  * flushed.
     105             :  */
     106             : static int
     107      470101 : bs_flush(stream *ss, mnstr_flush_level flush_level)
     108             : {
     109             :         uint16_t blksize;
     110             :         bs *s;
     111             : 
     112      470101 :         s = (bs *) ss->stream_data.p;
     113      470101 :         if (s == NULL)
     114             :                 return -1;
     115      470101 :         assert(!ss->readonly);
     116      470101 :         assert(s->nr < sizeof(s->buf));
     117             :         if (!ss->readonly) {
     118             :                 /* flush the rest of buffer (if s->nr > 0), then set the
     119             :                  * last bit to 1 to to indicate user-instigated flush */
     120             : #ifdef BSTREAM_DEBUG
     121             :                 if (s->nr > 0) {
     122             :                         unsigned i;
     123             : 
     124             :                         fprintf(stderr, "W %s %u \"", ss->name, s->nr);
     125             :                         for (i = 0; i < s->nr; i++)
     126             :                                 if (' ' <= s->buf[i] && s->buf[i] < 127)
     127             :                                         putc(s->buf[i], stderr);
     128             :                                 else
     129             :                                         fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
     130             :                         fprintf(stderr, "\"\n");
     131             :                         fprintf(stderr, "W %s 0\n", ss->name);
     132             :                 }
     133             : #endif
     134      470101 :                 blksize = (uint16_t) (s->nr << 1);
     135      470101 :                 s->bytes += s->nr;
     136             :                 /* indicate that this is the last buffer of a block by
     137             :                  * setting the low-order bit */
     138      470101 :                 blksize |= 1;
     139             :                 /* always flush (even empty blocks) needed for the protocol) */
     140      470101 :                 if ((!mnstr_writeSht(ss->inner, (int16_t) blksize) ||
     141      470101 :                      (s->nr > 0 &&
     142      442029 :                       ss->inner->write(ss->inner, s->buf, 1, s->nr) != (ssize_t) s->nr))) {
     143         139 :                         mnstr_copy_error(ss, ss->inner);
     144         139 :                         s->nr = 0; /* data is lost due to error */
     145         139 :                         return -1;
     146             :                 }
     147             :                 // shouldn't we flush ss->inner too?
     148             :                 (void) flush_level;
     149      469962 :                 s->blks++;
     150      469962 :                 s->nr = 0;
     151             :         }
     152      469962 :         return 0;
     153             : }
     154             : 
     155             : /* Read buffered data and return the number of items read.  At the
     156             :  * flush boundary we will return 0 to indicate the end of a block,
     157             :  * unless prompt and pstream are set. In that case, only return 0
     158             :  * after the prompt has been written to pstream and another read
     159             :  * attempt immediately returns a block boundary.
     160             :  *
     161             :  * Structure field usage:
     162             :  * s - the underlying stream;
     163             :  * buf - not used;
     164             :  * itotal - the amount of data in the current block that hasn't yet
     165             :  *          been read;
     166             :  * nr - indicates whether the flush marker has to be returned.
     167             :  */
     168             : ssize_t
     169     1162104 : bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
     170             : {
     171     1162104 :         ssize_t ret = bs_read_internal(ss, buf, elmsize, cnt);
     172     1162071 :         if (ret != 0 || ss->eof)
     173             :                 return ret;
     174             : 
     175      465756 :         bs *b = (bs*) ss-> stream_data.p;
     176      465756 :         if (b->prompt == NULL || b->pstream == NULL)
     177             :                 return 0;
     178             : 
     179             :         // before returning the 0 we send the prompt and make another attempt.
     180         335 :         if (mnstr_write(b->pstream, b->prompt, strlen(b->prompt), 1) != 1)
     181             :                 return -1;
     182         335 :         if (mnstr_flush(b->pstream, MNSTR_FLUSH_DATA) < 0)
     183             :                 return -1;
     184             : 
     185             :         // if it succeeds, return that to the client.
     186             :         // if it's still a block boundary, return that to the client.
     187             :         // if there's an error, return that to the client.
     188         335 :         return bs_read_internal(ss, buf, elmsize, cnt);
     189             : }
     190             : 
     191             : static ssize_t
     192     1162409 : bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
     193             : {
     194             :         bs *s;
     195     1162409 :         size_t todo = cnt * elmsize;
     196             :         size_t n;
     197             : 
     198     1162409 :         s = (bs *) ss->stream_data.p;
     199     1162409 :         if (s == NULL)
     200             :                 return -1;
     201     1162409 :         assert(ss->readonly);
     202     1162409 :         assert(s->nr <= 1);
     203             : 
     204     1162409 :         if (s->itotal == 0) {
     205      929991 :                 int16_t blksize = 0;
     206             : 
     207      929991 :                 if (s->nr) {
     208             :                         /* We read the closing block but hadn't
     209             :                          * returned that yet. Return it now, and note
     210             :                          * that we did by setting s->nr to 0. */
     211             :                         assert(s->nr == 1);
     212      458548 :                         s->nr = 0;
     213      464186 :                         return 0;
     214             :                 }
     215             : 
     216             :                 assert(s->nr == 0);
     217             : 
     218             :                 /* There is nothing more to read in the current block,
     219             :                  * so read the count for the next block */
     220      471443 :                 switch (mnstr_readSht(ss->inner, &blksize)) {
     221          17 :                 case -1:
     222          17 :                         mnstr_copy_error(ss, ss->inner);
     223          17 :                         return -1;
     224        5621 :                 case 0:
     225        5621 :                         ss->eof |= ss->inner->eof;
     226        5621 :                         return 0;
     227             :                 case 1:
     228             :                         break;
     229             :                 }
     230      465805 :                 if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
     231           0 :                         mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
     232           0 :                         return -1;
     233             :                 }
     234             : #ifdef BSTREAM_DEBUG
     235             :                 fprintf(stderr, "RC size: %u, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
     236             :                 fprintf(stderr, "RC %s %u\n", ss->name, (uint16_t) blksize);
     237             : #endif
     238      465805 :                 s->itotal = (uint16_t) blksize >> 1;   /* amount readable */
     239             :                 /* store whether this was the last block or not */
     240      465805 :                 s->nr = (uint16_t) blksize & 1;
     241      465805 :                 s->bytes += s->itotal;
     242      465805 :                 s->blks++;
     243             :         }
     244             : 
     245             :         /* Fill the caller's buffer. */
     246             :         cnt = 0;                /* count how much we put into the buffer */
     247      987183 :         while (todo > 0) {
     248             :                 /* there is more data waiting in the current block, so
     249             :                  * read it */
     250      754743 :                 n = todo < s->itotal ? todo : s->itotal;
     251     1502252 :                 while (n > 0) {
     252      747487 :                         ssize_t m = ss->inner->read(ss->inner, buf, 1, n);
     253             : 
     254      747509 :                         if (m <= 0) {
     255           0 :                                 ss->eof |= ss->inner->eof;
     256           0 :                                 mnstr_copy_error(ss, ss->inner);
     257           0 :                                 return -1;
     258             :                         }
     259             : #ifdef BSTREAM_DEBUG
     260             :                         {
     261             :                                 ssize_t i;
     262             : 
     263             :                                 fprintf(stderr, "RD %s %zd \"", ss->name, m);
     264             :                                 for (i = 0; i < m; i++)
     265             :                                         if (' ' <= ((char *) buf)[i] &&
     266             :                                             ((char *) buf)[i] < 127)
     267             :                                                 putc(((char *) buf)[i], stderr);
     268             :                                         else
     269             :                                                 fprintf(stderr, "\\%03o", ((unsigned char *) buf)[i]);
     270             :                                 fprintf(stderr, "\"\n");
     271             :                         }
     272             : #endif
     273      747509 :                         buf = (void *) ((char *) buf + m);
     274      747509 :                         cnt += (size_t) m;
     275      747509 :                         n -= (size_t) m;
     276      747509 :                         s->itotal -= (unsigned) m;
     277      747509 :                         todo -= (size_t) m;
     278             :                 }
     279             : 
     280      754765 :                 if (s->itotal == 0) {
     281      548707 :                         int16_t blksize = 0;
     282             : 
     283             :                         /* The current block has been completely read,
     284             :                          * so read the count for the next block, only
     285             :                          * if the previous was not the last one */
     286      548707 :                         if (s->nr)
     287             :                                 break;
     288       82902 :                         switch (mnstr_readSht(ss->inner, &blksize)) {
     289           0 :                         case -1:
     290           0 :                                 mnstr_copy_error(ss, ss->inner);
     291           0 :                                 return -1;
     292           0 :                         case 0:
     293           0 :                                 ss->eof |= ss->inner->eof;
     294           0 :                                 return 0;
     295             :                         case 1:
     296             :                                 break;
     297             :                         }
     298       82902 :                         if ((uint16_t) blksize > (BLOCK << 1 | 1)) {
     299           0 :                                 mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %d", blksize);
     300           0 :                                 return -1;
     301             :                         }
     302             : #ifdef BSTREAM_DEBUG
     303             :                         fprintf(stderr, "RC size: %d, final: %s\n", (uint16_t) blksize >> 1, (uint16_t) blksize & 1 ? "true" : "false");
     304             :                         fprintf(stderr, "RC %s %d\n", ss->name, s->nr);
     305             :                         fprintf(stderr, "RC %s %d\n", ss->name, blksize);
     306             : #endif
     307       82902 :                         s->itotal = (uint16_t) blksize >> 1;   /* amount readable */
     308             :                         /* store whether this was the last block or not */
     309       82902 :                         s->nr = (uint16_t) blksize & 1;
     310       82902 :                         s->bytes += s->itotal;
     311       82902 :                         s->blks++;
     312             :                 }
     313             :         }
     314             :         /* if we got an empty block with the end-of-sequence marker
     315             :          * set (low-order bit) we must only return an empty read once,
     316             :          * so we must squash the flag that we still have to return an
     317             :          * empty read */
     318      698245 :         if (todo > 0 && cnt == 0)
     319        7257 :                 s->nr = 0;
     320      698245 :         return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
     321             : }
     322             : 
     323             : 
     324             : 
     325             : static void
     326       13728 : bs_close(stream *ss)
     327             : {
     328             :         bs *s;
     329             : 
     330       13728 :         s = (bs *) ss->stream_data.p;
     331       13728 :         assert(s);
     332             :         if (s == NULL)
     333             :                 return;
     334       13728 :         if (!ss->readonly && s->nr > 0)
     335           6 :                 bs_flush(ss, MNSTR_FLUSH_DATA);
     336       13728 :         mnstr_close(ss->inner);
     337             : }
     338             : 
     339             : void
     340       13728 : bs_destroy(stream *ss)
     341             : {
     342             :         bs *s;
     343             : 
     344       13728 :         s = (bs *) ss->stream_data.p;
     345       13728 :         assert(s);
     346             :         if (s) {
     347       13728 :                 if (ss->inner)
     348       13728 :                         ss->inner->destroy(ss->inner);
     349       13728 :                 free(s);
     350             :         }
     351       13728 :         destroy_stream(ss);
     352       13728 : }
     353             : 
     354             : void
     355           0 : bs_clrerr(stream *s)
     356             : {
     357           0 :         if (s->stream_data.p)
     358           0 :                 mnstr_clearerr(s->inner);
     359           0 : }
     360             : 
     361             : stream *
     362           0 : bs_stream(stream *s)
     363             : {
     364           0 :         assert(isa_block_stream(s));
     365           0 :         return s->inner;
     366             : }
     367             : 
     368             : stream *
     369       13740 : block_stream(stream *s)
     370             : {
     371             :         stream *ns;
     372             :         bs *b;
     373             : 
     374       13740 :         if (s == NULL)
     375             :                 return NULL;
     376             : #ifdef STREAM_DEBUG
     377             :         fprintf(stderr, "block_stream %s\n", s->name ? s->name : "<unnamed>");
     378             : #endif
     379       13740 :         if ((ns = create_wrapper_stream(NULL, s)) == NULL)
     380             :                 return NULL;
     381       13740 :         if ((b = bs_create()) == NULL) {
     382           0 :                 destroy_stream(ns);
     383           0 :                 mnstr_set_open_error(s->name, 0, "bs_create failed");
     384           0 :                 return NULL;
     385             :         }
     386             :         /* blocksizes have a fixed little endian byteorder */
     387             : #ifdef WORDS_BIGENDIAN
     388             :         s->swapbytes = true;
     389             : #endif
     390             : 
     391       13740 :         ns->flush = bs_flush;
     392       13740 :         ns->read = bs_read;
     393       13740 :         ns->write = bs_write;
     394       13740 :         ns->close = bs_close;
     395       13740 :         ns->destroy = bs_destroy;
     396       13740 :         ns->stream_data.p = (void *) b;
     397             : 
     398       13740 :         return ns;
     399             : }
     400             : 
     401             : void
     402         100 : set_prompting(stream *block_stream, const char *prompt, stream *prompt_stream)
     403             : {
     404         100 :         if (isa_block_stream(block_stream)) {
     405         100 :                 bs *bs = block_stream->stream_data.p;
     406         100 :                 bs->prompt = prompt;
     407         100 :                 bs->pstream = prompt_stream;
     408             :         }
     409         100 : }

Generated by: LCOV version 1.14