LCOV - code coverage report
Current view: top level - common/stream - bs2.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 281 0.0 %
Date: 2021-10-13 02:24:04 Functions: 0 16 0.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "stream.h"
      11             : #include "stream_internal.h"
      12             : 
      13             : 
      14             : /* ------------------------------------------------------------------ */
      15             : typedef struct bs2 {
      16             :         stream *s;              /* underlying stream */
      17             :         size_t nr;              /* how far we got in buf */
      18             :         size_t itotal;          /* amount available in current read block */
      19             :         size_t bufsiz;
      20             :         size_t readpos;
      21             :         compression_method comp;
      22             :         char *compbuf;
      23             :         size_t compbufsiz;
      24             :         char *buf;
      25             : } bs2;
      26             : 
      27             : 
      28             : static ssize_t
      29           0 : compress_stream_data(bs2 *s)
      30             : {
      31           0 :         assert(s->comp != COMPRESSION_NONE);
      32           0 :         if (s->comp == COMPRESSION_SNAPPY) {
      33             : #ifdef HAVE_SNAPPY
      34           0 :                 size_t compressed_length = s->compbufsiz;
      35             :                 snappy_status ret;
      36           0 :                 if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, &compressed_length)) != SNAPPY_OK) {
      37             :                         return -1;
      38             :                 }
      39           0 :                 return compressed_length;
      40             : #else
      41             :                 assert(0);
      42             :                 return -1;
      43             : #endif
      44           0 :         } else if (s->comp == COMPRESSION_LZ4) {
      45             : #ifdef HAVE_LIBLZ4
      46           0 :                 int compressed_length = (int) s->compbufsiz;
      47           0 :                 assert(s->nr < INT_MAX);
      48           0 :                 if ((compressed_length = LZ4_compress_fast(s->buf, s->compbuf, (int)s->nr, compressed_length, 1)) == 0) {
      49             :                         return -1;
      50             :                 }
      51           0 :                 return compressed_length;
      52             : #else
      53             :                 assert(0);
      54             :                 return -1;
      55             : #endif
      56             :         }
      57             :         return -1;
      58             : }
      59             : 
      60             : 
      61             : static ssize_t
      62           0 : decompress_stream_data(bs2 *s)
      63             : {
      64           0 :         assert(s->comp != COMPRESSION_NONE);
      65           0 :         if (s->comp == COMPRESSION_SNAPPY) {
      66             : #ifdef HAVE_SNAPPY
      67             :                 snappy_status ret;
      68           0 :                 size_t uncompressed_length = s->bufsiz;
      69           0 :                 if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
      70             :                         return -1;
      71             :                 }
      72           0 :                 return (ssize_t) uncompressed_length;
      73             : #else
      74             :                 assert(0);
      75             :                 return -1;
      76             : #endif
      77           0 :         } else if (s->comp == COMPRESSION_LZ4) {
      78             : #ifdef HAVE_LIBLZ4
      79           0 :                 int uncompressed_length = (int) s->bufsiz;
      80           0 :                 assert(s->itotal < INT_MAX);
      81           0 :                 if ((uncompressed_length = LZ4_decompress_safe(s->compbuf, s->buf, (int)s->itotal, uncompressed_length)) <= 0) {
      82             :                         return -1;
      83             :                 }
      84           0 :                 return uncompressed_length;
      85             : #else
      86             :                 assert(0);
      87             :                 return -1;
      88             : #endif
      89             :         }
      90             :         return -1;
      91             : }
      92             : 
      93             : static ssize_t
      94           0 : compression_size_bound(bs2 *s)
      95             : {
      96           0 :         if (s->comp == COMPRESSION_NONE) {
      97             :                 return 0;
      98           0 :         } else if (s->comp == COMPRESSION_SNAPPY) {
      99             : #ifndef HAVE_SNAPPY
     100             :                 return -1;
     101             : #else
     102           0 :                 return snappy_max_compressed_length(s->bufsiz);
     103             : #endif
     104           0 :         } else if (s->comp == COMPRESSION_LZ4) {
     105             : #ifndef HAVE_LIBLZ4
     106             :                 return -1;
     107             : #else
     108           0 :                 assert(s->bufsiz < INT_MAX);
     109           0 :                 return LZ4_compressBound((int)s->bufsiz);
     110             : #endif
     111             :         }
     112             :         return -1;
     113             : }
     114             : 
     115             : static bs2 *
     116           0 : bs2_create(stream *s, size_t bufsiz, compression_method comp)
     117             : {
     118             :         /* should be a binary stream */
     119             :         bs2 *ns;
     120             :         ssize_t compress_bound = 0;
     121             : 
     122           0 :         if ((ns = malloc(sizeof(*ns))) == NULL)
     123             :                 return NULL;
     124           0 :         *ns = (bs2) {
     125           0 :                 .buf = malloc(bufsiz),
     126             :                 .s = s,
     127             :                 .bufsiz = bufsiz,
     128             :                 .comp = comp,
     129             :         };
     130           0 :         if (ns->buf == NULL) {
     131           0 :                 free(ns);
     132           0 :                 return NULL;
     133             :         }
     134             : 
     135           0 :         compress_bound = compression_size_bound(ns);
     136           0 :         if (compress_bound > 0) {
     137           0 :                 ns->compbufsiz = (size_t) compress_bound;
     138           0 :                 ns->compbuf = malloc(ns->compbufsiz);
     139           0 :                 if (!ns->compbuf) {
     140           0 :                         free(ns->buf);
     141           0 :                         free(ns);
     142           0 :                         return NULL;
     143             :                 }
     144           0 :         } else if (compress_bound < 0) {
     145           0 :                 free(ns->buf);
     146           0 :                 free(ns);
     147           0 :                 return NULL;
     148             :         }
     149             :         return ns;
     150             : }
     151             : 
     152             : /* Collect data until the internal buffer is filled, then write the
     153             :  * filled buffer to the underlying stream.
     154             :  * Struct field usage:
     155             :  * s - the underlying stream;
     156             :  * buf - the buffer in which data is collected;
     157             :  * nr - how much of buf is already filled (if nr == sizeof(buf) the
     158             :  *      data is written to the underlying stream, so upon entry nr <
     159             :  *      sizeof(buf));
     160             :  * itotal - unused.
     161             :  */
     162             : ssize_t
     163           0 : bs2_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
     164             : {
     165             :         bs2 *s;
     166           0 :         size_t todo = cnt * elmsize;
     167             :         int64_t blksize;
     168             :         char *writebuf;
     169             :         size_t writelen;
     170             : 
     171           0 :         s = (bs2 *) ss->stream_data.p;
     172           0 :         if (s == NULL)
     173             :                 return -1;
     174           0 :         assert(!ss->readonly);
     175           0 :         assert(s->nr < s->bufsiz);
     176           0 :         while (todo > 0) {
     177           0 :                 size_t n = s->bufsiz - s->nr;
     178             : 
     179             :                 if (todo < n)
     180             :                         n = todo;
     181           0 :                 memcpy(s->buf + s->nr, buf, n);
     182           0 :                 s->nr += n;
     183           0 :                 todo -= n;
     184           0 :                 buf = ((const char *) buf + n);
     185             :                 /* block is full, write it to the stream */
     186           0 :                 if (s->nr == s->bufsiz) {
     187             : 
     188             : #ifdef BSTREAM_DEBUG
     189             :                         {
     190             :                                 size_t i;
     191             : 
     192             :                                 fprintf(stderr, "W %s %zu \"", ss->name, s->nr);
     193             :                                 for (i = 0; i < s->nr; i++)
     194             :                                         if (' ' <= s->buf[i] && s->buf[i] < 127)
     195             :                                                 putc(s->buf[i], stderr);
     196             :                                         else
     197             :                                                 fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
     198             :                                 fprintf(stderr, "\"\n");
     199             :                         }
     200             : #endif
     201             : 
     202             :                         writelen = s->nr;
     203           0 :                         blksize = (int64_t) s->nr;
     204           0 :                         writebuf = s->buf;
     205             : 
     206           0 :                         if (s->comp != COMPRESSION_NONE) {
     207           0 :                                 ssize_t compressed_length = compress_stream_data(s);
     208           0 :                                 if (compressed_length < 0) {
     209             :                                         return -1;
     210             :                                 }
     211           0 :                                 writebuf = s->compbuf;
     212             :                                 blksize = (int64_t) compressed_length;
     213           0 :                                 writelen = (size_t) compressed_length;
     214             :                         }
     215             : 
     216             : 
     217             :                         /* the last bit tells whether a flush is in there, it's not
     218             :                          * at this moment, so shift it to the left */
     219           0 :                         blksize <<= 1;
     220           0 :                         if (!mnstr_writeLng(s->s, blksize) ||
     221           0 :                             s->s->write(s->s, writebuf, 1, writelen) != (ssize_t) writelen) {
     222           0 :                                 mnstr_copy_error(ss, s->s);
     223           0 :                                 return -1;
     224             :                         }
     225           0 :                         s->nr = 0;
     226             :                 }
     227             :         }
     228           0 :         return (ssize_t) cnt;
     229             : }
     230             : 
     231             : /* If the internal buffer is partially filled, write it to the
     232             :  * underlying stream.  Then in any case write an empty buffer to the
     233             :  * underlying stream to indicate to the receiver that the data was
     234             :  * flushed.
     235             :  */
     236             : static int
     237           0 : bs2_flush(stream *ss, mnstr_flush_level flush_level)
     238             : {
     239             :         int64_t blksize;
     240             :         bs2 *s;
     241             :         char *writebuf;
     242             :         size_t writelen;
     243             : 
     244           0 :         s = (bs2 *) ss->stream_data.p;
     245           0 :         if (s == NULL)
     246             :                 return -1;
     247           0 :         assert(!ss->readonly);
     248           0 :         assert(s->nr < s->bufsiz);
     249             :         if (!ss->readonly) {
     250             :                 /* flush the rest of buffer (if s->nr > 0), then set the
     251             :                  * last bit to 1 to to indicate user-instigated flush */
     252             : #ifdef BSTREAM_DEBUG
     253             :                 if (s->nr > 0) {
     254             :                         size_t i;
     255             : 
     256             :                         fprintf(stderr, "W %s %zu \"", ss->name, s->nr);
     257             :                         for (i = 0; i < s->nr; i++)
     258             :                                 if (' ' <= s->buf[i] && s->buf[i] < 127)
     259             :                                         putc(s->buf[i], stderr);
     260             :                                 else
     261             :                                         fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
     262             :                         fprintf(stderr, "\"\n");
     263             :                         fprintf(stderr, "W %s 0\n", ss->name);
     264             :                 }
     265             : #endif
     266             : 
     267             :                 writelen = s->nr;
     268           0 :                 blksize = (int64_t) s->nr;
     269           0 :                 writebuf = s->buf;
     270             : 
     271           0 :                 if (s->nr > 0 && s->comp != COMPRESSION_NONE) {
     272           0 :                         ssize_t compressed_length = compress_stream_data(s);
     273           0 :                         if (compressed_length < 0) {
     274             :                                 return -1;
     275             :                         }
     276           0 :                         writebuf = s->compbuf;
     277             :                         blksize = (int64_t) compressed_length;
     278           0 :                         writelen = (size_t) compressed_length;
     279             :                 }
     280             : 
     281             :                 /* indicate that this is the last buffer of a block by
     282             :                  * setting the low-order bit */
     283           0 :                 blksize <<= 1;
     284           0 :                 blksize |= 1;
     285             :                 /* always flush (even empty blocks) needed for the protocol) */
     286             : 
     287           0 :                 if ((!mnstr_writeLng(s->s, blksize) ||
     288           0 :                      (s->nr > 0 &&
     289           0 :                       s->s->write(s->s, writebuf, 1, writelen) != (ssize_t) writelen))) {
     290           0 :                         mnstr_copy_error(ss, s->s);
     291           0 :                         return -1;
     292             :                 }
     293           0 :                 s->nr = 0;
     294             :                 // shouldn't we flush s->s too?
     295             :                 (void) flush_level;
     296             :         }
     297           0 :         return 0;
     298             : }
     299             : 
     300             : /* Read buffered data and return the number of items read.  At the
     301             :  * flush boundary we will return 0 to indicate the end of a block.
     302             :  *
     303             :  * Structure field usage:
     304             :  * s - the underlying stream;
     305             :  * buf - not used;
     306             :  * itotal - the amount of data in the current block that hasn't yet
     307             :  *          been read;
     308             :  * nr - indicates whether the flush marker has to be returned.
     309             :  */
     310             : ssize_t
     311           0 : bs2_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
     312             : {
     313             :         bs2 *s;
     314           0 :         size_t todo = cnt * elmsize;
     315             :         size_t n;
     316             : 
     317           0 :         s = (bs2 *) ss->stream_data.p;
     318           0 :         if (s == NULL)
     319             :                 return -1;
     320           0 :         assert(ss->readonly);
     321           0 :         assert(s->nr <= 1);
     322             : 
     323           0 :         if (s->itotal == 0) {
     324           0 :                 int64_t blksize = 0;
     325             : 
     326           0 :                 if (s->nr) {
     327             :                         /* We read the closing block but hadn't
     328             :                          * returned that yet. Return it now, and note
     329             :                          * that we did by setting s->nr to 0. */
     330             :                         assert(s->nr == 1);
     331           0 :                         s->nr = 0;
     332           0 :                         return 0;
     333             :                 }
     334             : 
     335             :                 assert(s->nr == 0);
     336             : 
     337             :                 /* There is nothing more to read in the current block,
     338             :                  * so read the count for the next block */
     339           0 :                 switch (mnstr_readLng(s->s, &blksize)) {
     340           0 :                 case -1:
     341           0 :                         mnstr_copy_error(ss, s->s);
     342           0 :                         return -1;
     343           0 :                 case 0:
     344           0 :                         ss->eof |= s->s->eof;
     345           0 :                         return 0;
     346             :                 case 1:
     347             :                         break;
     348             :                 }
     349           0 :                 if (blksize < 0) {
     350           0 :                         mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %" PRId64 "", blksize);
     351           0 :                         return -1;
     352             :                 }
     353             : #ifdef BSTREAM_DEBUG
     354             :                 fprintf(stderr, "R1 '%s' length: %" PRId64 ", final: %s\n", ss->name, blksize >> 1, blksize & 1 ? "true" : "false");
     355             : #endif
     356           0 :                 s->itotal = (size_t) (blksize >> 1);   /* amount readable */
     357             :                 /* store whether this was the last block or not */
     358           0 :                 s->nr = blksize & 1;
     359             : 
     360           0 :                 if (s->itotal > 0) {
     361             :                         /* read everything into the comp buf */
     362             :                         ssize_t uncompressed_length = (ssize_t) s->bufsiz;
     363             :                         size_t m = 0;
     364           0 :                         char *buf = s->buf;
     365             : 
     366           0 :                         if (s->comp != COMPRESSION_NONE) {
     367           0 :                                 buf = s->compbuf;
     368             :                         }
     369             : 
     370           0 :                         while (m < s->itotal) {
     371             :                                 ssize_t bytes_read = 0;
     372           0 :                                 bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m);
     373           0 :                                 if (bytes_read <= 0) {
     374           0 :                                         ss->eof |= s->s->eof;
     375           0 :                                         mnstr_copy_error(ss, s->s);
     376           0 :                                         return -1;
     377             :                                 }
     378           0 :                                 m += (size_t) bytes_read;
     379             :                         }
     380           0 :                         if (s->comp != COMPRESSION_NONE) {
     381           0 :                                 uncompressed_length = decompress_stream_data(s);
     382           0 :                                 if (uncompressed_length < 0) {
     383           0 :                                         if (s->s->errkind != MNSTR_NO__ERROR)
     384           0 :                                                 mnstr_copy_error(ss, s->s);
     385             :                                         else
     386           0 :                                                 mnstr_set_error(ss, MNSTR_READ_ERROR, "uncompress failed with code %d", (int) uncompressed_length);
     387           0 :                                         return -1;
     388             :                                 }
     389             :                         } else {
     390           0 :                                 uncompressed_length = (ssize_t) m;
     391             :                         }
     392           0 :                         s->itotal = (size_t) uncompressed_length;
     393           0 :                         s->readpos = 0;
     394             :                 }
     395             :         }
     396             : 
     397             :         /* Fill the caller's buffer. */
     398             :         cnt = 0;                /* count how much we put into the buffer */
     399           0 :         while (todo > 0) {
     400             :                 /* there is more data waiting in the current block, so
     401             :                  * read it */
     402           0 :                 n = todo < s->itotal ? todo : s->itotal;
     403             : 
     404           0 :                 memcpy(buf, s->buf + s->readpos, n);
     405           0 :                 buf = (void *) ((char *) buf + n);
     406           0 :                 cnt += n;
     407           0 :                 todo -= n;
     408           0 :                 s->readpos += n;
     409           0 :                 s->itotal -= n;
     410             : 
     411           0 :                 if (s->itotal == 0) {
     412           0 :                         int64_t blksize = 0;
     413             : 
     414             :                         /* The current block has been completely read,
     415             :                          * so read the count for the next block, only
     416             :                          * if the previous was not the last one */
     417           0 :                         if (s->nr)
     418             :                                 break;
     419           0 :                         switch (mnstr_readLng(s->s, &blksize)) {
     420           0 :                         case -1:
     421           0 :                                 mnstr_copy_error(ss, s->s);
     422           0 :                                 return -1;
     423           0 :                         case 0:
     424           0 :                                 ss->eof |= s->s->eof;
     425           0 :                                 return 0;
     426             :                         case 1:
     427             :                                 break;
     428             :                         }
     429           0 :                         if (blksize < 0) {
     430           0 :                                 mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %" PRId64 "", blksize);
     431           0 :                                 return -1;
     432             :                         }
     433             : #ifdef BSTREAM_DEBUG
     434             :                         fprintf(stderr, "R3 '%s' length: %" PRId64 ", final: %s\n", ss->name, blksize >> 1, blksize & 1 ? "true" : "false");
     435             : #endif
     436             : 
     437             : 
     438           0 :                         s->itotal = (size_t) (blksize >> 1);   /* amount readable */
     439             :                         /* store whether this was the last block or not */
     440           0 :                         s->nr = blksize & 1;
     441             : 
     442           0 :                         if (s->itotal > 0) {
     443             :                                 /* read everything into the comp buf */
     444             :                                 ssize_t uncompressed_length = (ssize_t) s->bufsiz;
     445             :                                 size_t m = 0;
     446           0 :                                 char *buf = s->buf;
     447             : 
     448           0 :                                 if (s->comp != COMPRESSION_NONE) {
     449           0 :                                         buf = s->compbuf;
     450             :                                 }
     451             : 
     452           0 :                                 while (m < s->itotal) {
     453             :                                         ssize_t bytes_read = 0;
     454           0 :                                         bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m);
     455           0 :                                         if (bytes_read <= 0) {
     456           0 :                                                 ss->eof |= s->s->eof;
     457           0 :                                                 mnstr_copy_error(ss, s->s);
     458           0 :                                                 return -1;
     459             :                                         }
     460           0 :                                         m += (size_t) bytes_read;
     461             :                                 }
     462           0 :                                 if (s->comp != COMPRESSION_NONE) {
     463           0 :                                         uncompressed_length = decompress_stream_data(s);
     464           0 :                                         if (uncompressed_length < 0) {
     465           0 :                                                 if (s->s->errkind != MNSTR_NO__ERROR)
     466           0 :                                                         mnstr_copy_error(ss, s->s);
     467             :                                                 else
     468           0 :                                                         mnstr_set_error(ss, MNSTR_READ_ERROR, "uncompress failed with code %d", (int) uncompressed_length);
     469           0 :                                                 return -1;
     470             :                                         }
     471             :                                 } else {
     472           0 :                                         uncompressed_length = (ssize_t) m;
     473             :                                 }
     474           0 :                                 s->itotal = (size_t) uncompressed_length;
     475           0 :                                 s->readpos = 0;
     476             :                         }
     477             :                 }
     478             :         }
     479             :         /* if we got an empty block with the end-of-sequence marker
     480             :          * set (low-order bit) we must only return an empty read once,
     481             :          * so we must squash the flag that we still have to return an
     482             :          * empty read */
     483           0 :         if (todo > 0 && cnt == 0)
     484           0 :                 s->nr = 0;
     485           0 :         return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
     486             : }
     487             : 
     488             : 
     489             : 
     490             : static void
     491           0 : bs2_resetbuf(stream *ss)
     492             : {
     493           0 :         bs2 *s = (bs2 *) ss->stream_data.p;
     494           0 :         assert(ss->read == bs2_read);
     495           0 :         s->itotal = 0;
     496           0 :         s->nr = 0;
     497           0 :         s->readpos = 0;
     498           0 : }
     499             : 
     500             : int
     501           0 : bs2_resizebuf(stream *ss, size_t bufsiz)
     502             : {
     503             :         ssize_t compress_bound;
     504           0 :         bs2 *s = (bs2 *) ss->stream_data.p;
     505           0 :         assert(ss->read == bs2_read);
     506             : 
     507           0 :         if (s->buf)
     508           0 :                 free(s->buf);
     509           0 :         if (s->compbuf)
     510           0 :                 free(s->compbuf);
     511             : 
     512           0 :         s->bufsiz = 0;
     513             :         s->buf = NULL;
     514           0 :         s->compbuf = NULL;
     515             : 
     516           0 :         if ((s->buf = malloc(bufsiz)) == NULL) {
     517             :                 return -1;
     518             :         }
     519           0 :         s->bufsiz = bufsiz;
     520           0 :         compress_bound = compression_size_bound(s);
     521           0 :         if (compress_bound > 0) {
     522           0 :                 s->compbufsiz = (size_t) compress_bound;
     523           0 :                 s->compbuf = malloc(s->compbufsiz);
     524           0 :                 if (!s->compbuf) {
     525           0 :                         free(s->buf);
     526           0 :                         s->buf = NULL;
     527           0 :                         return -1;
     528             :                 }
     529             :         }
     530           0 :         bs2_resetbuf(ss);
     531           0 :         return 0;
     532             : }
     533             : 
     534             : buffer
     535           0 : bs2_buffer(stream *ss)
     536             : {
     537           0 :         bs2 *s = (bs2 *) ss->stream_data.p;
     538             :         buffer b;
     539           0 :         assert(ss->read == bs2_read);
     540           0 :         b.buf = s->buf;
     541           0 :         b.pos = s->nr;
     542           0 :         b.len = s->itotal;
     543           0 :         return b;
     544             : }
     545             : 
     546             : void
     547           0 : bs2_setpos(stream *ss, size_t pos)
     548             : {
     549           0 :         bs2 *s = (bs2 *) ss->stream_data.p;
     550           0 :         assert(pos < s->bufsiz);
     551           0 :         s->nr = pos;
     552           0 : }
     553             : 
     554             : 
     555             : 
     556             : 
     557             : static void
     558           0 : bs2_close(stream *ss)
     559             : {
     560             :         bs2 *s;
     561             : 
     562           0 :         s = (bs2 *) ss->stream_data.p;
     563           0 :         assert(s);
     564             :         if (s == NULL)
     565             :                 return;
     566           0 :         if (!ss->readonly && s->nr > 0)
     567           0 :                 bs2_flush(ss, MNSTR_FLUSH_DATA);
     568           0 :         assert(s->s);
     569             :         if (s->s)
     570           0 :                 s->s->close(s->s);
     571             : }
     572             : 
     573             : static void
     574           0 : bs2_destroy(stream *ss)
     575             : {
     576             :         bs2 *s;
     577             : 
     578           0 :         s = (bs2 *) ss->stream_data.p;
     579           0 :         assert(s);
     580             :         if (s) {
     581           0 :                 assert(s->s);
     582             :                 if (s->s)
     583           0 :                         s->s->destroy(s->s);
     584           0 :                 if (s->buf)
     585           0 :                         free(s->buf);
     586           0 :                 if (s->compbuf)
     587           0 :                         free(s->compbuf);
     588           0 :                 free(s);
     589             :         }
     590           0 :         destroy_stream(ss);
     591           0 : }
     592             : 
     593             : static void
     594           0 : bs2_update_timeout(stream *ss)
     595             : {
     596             :         bs2 *s;
     597             : 
     598           0 :         if ((s = ss->stream_data.p) != NULL && s->s) {
     599           0 :                 s->s->timeout = ss->timeout;
     600           0 :                 s->s->timeout_func = ss->timeout_func;
     601           0 :                 s->s->timeout_data = ss->timeout_data;
     602           0 :                 if (s->s->update_timeout)
     603           0 :                         s->s->update_timeout(s->s);
     604             :         }
     605           0 : }
     606             : 
     607             : static int
     608           0 : bs2_isalive(const stream *ss)
     609             : {
     610             :         struct bs2 *s;
     611             : 
     612           0 :         if ((s = ss->stream_data.p) != NULL && s->s) {
     613           0 :                 if (s->s->isalive)
     614           0 :                         return s->s->isalive(s->s);
     615             :                 return 1;
     616             :         }
     617             :         return 0;
     618             : }
     619             : 
     620             : stream *
     621           0 : block_stream2(stream *s, size_t bufsiz, compression_method comp)
     622             : {
     623             :         stream *ns;
     624             :         stream *os = NULL;
     625             :         bs2 *b;
     626             : 
     627           0 :         if (s == NULL)
     628             :                 return NULL;
     629           0 :         if (s->read == bs_read || s->write == bs_write) {
     630             :                 /* if passed in a block_stream instance, extract the
     631             :                  * underlying stream */
     632             :                 os = s;
     633           0 :                 s = s->inner;
     634             :         }
     635             : 
     636             : #ifdef STREAM_DEBUG
     637             :         fprintf(stderr, "block_stream2 %s\n", s->name ? s->name : "<unnamed>");
     638             : #endif
     639           0 :         if ((ns = create_wrapper_stream(NULL, s)) == NULL)
     640             :                 return NULL;
     641           0 :         if ((b = bs2_create(s, bufsiz, comp)) == NULL) {
     642           0 :                 destroy_stream(ns);
     643           0 :                 mnstr_set_open_error(s->name, 0, "bs2_create failed");
     644           0 :                 return NULL;
     645             :         }
     646             :         /* blocksizes have a fixed little endian byteorder */
     647             : #ifdef WORDS_BIGENDIAN
     648             :         s->swapbytes = true;
     649             : #endif
     650           0 :         ns->binary = s->binary;
     651           0 :         ns->readonly = s->readonly;
     652           0 :         ns->close = bs2_close;
     653           0 :         ns->clrerr = bs_clrerr;
     654           0 :         ns->destroy = bs2_destroy;
     655           0 :         ns->flush = bs2_flush;
     656           0 :         ns->read = bs2_read;
     657           0 :         ns->write = bs2_write;
     658           0 :         ns->update_timeout = bs2_update_timeout;
     659           0 :         ns->isalive = bs2_isalive;
     660           0 :         ns->stream_data.p = (void *) b;
     661             : 
     662           0 :         if (os != NULL) {
     663             :                 /* we extracted the underlying stream, destroy the old
     664             :                  * shell */
     665           0 :                 os->inner = NULL;
     666           0 :                 bs_destroy(os);
     667             :         }
     668             : 
     669             :         return ns;
     670             : }

Generated by: LCOV version 1.14