LCOV - code coverage report
Current view: top level - common/stream - stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 251 340 73.8 %
Date: 2021-10-13 02:24:04 Functions: 38 53 71.7 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : 
      10             : /* stream
      11             :  * ======
      12             :  * Niels Nes
      13             :  * An simple interface to streams
      14             :  *
      15             :  * Processing files, streams, and sockets is quite different on Linux
      16             :  * and Windows platforms. To improve portability between both, we advise
      17             :  * to replace the stdio actions with the stream functionality provided
      18             :  * here.
      19             :  *
      20             :  * This interface can also be used to open 'non compressed, gzipped,
      21             :  * bz2zipped' data files and sockets. Using this interface one could
      22             :  * easily switch between the various underlying storage types.
      23             :  *
      24             :  * buffered streams
      25             :  * ----------------
      26             :  *
      27             :  * The bstream (or buffered_stream) can be used for efficient reading of
      28             :  * a stream. Reading can be done in large chunks and access can be done
      29             :  * in smaller bits, by directly accessing the underlying buffer.
      30             :  *
      31             :  * Beware that a flush on a buffered stream emits an empty block to
      32             :  * synchronize with the other side, telling it has reached the end of
      33             :  * the sequence and can close its descriptors.
      34             :  *
      35             :  * bstream functions
      36             :  * -----------------
      37             :  *
      38             :  * The bstream_create gets a read stream (rs) as input and the initial
      39             :  * chunk size and creates a buffered stream from this. A spare byte is
      40             :  * kept at the end of the buffer.  The bstream_read will at least read
      41             :  * the next 'size' bytes. If the not read data (aka pos < len) together
      42             :  * with the new data will not fit in the current buffer it is resized.
      43             :  * The spare byte is kept.
      44             :  *
      45             :  * tee streams
      46             :  * -----------
      47             :  *
      48             :  * A tee stream is a write stream that duplicates all output to two
      49             :  * write streams of the same type (txt/bin).
      50             :  */
      51             : 
      52             : /* Generic stream handling code such as init and close */
      53             : 
      54             : #include "monetdb_config.h"
      55             : #include "stream.h"
      56             : #include "stream_internal.h"
      57             : #include <stdio.h>
      58             : 
      59             : 
      60             : #ifdef HAVE_PTHREAD_H
      61             : #include <pthread.h>
      62             : #endif
      63             : 
      64             : struct tl_error_buf {
      65             :         char msg[1024];
      66             : };
      67             : 
      68             : static int tl_error_init(void);
      69             : static struct tl_error_buf *get_tl_error_buf(void);
      70             : 
      71             : #ifdef HAVE_PTHREAD_H
      72             : 
      73             : static pthread_key_t tl_error_key;
      74             : 
      75             : static int
      76         479 : tl_error_init(void)
      77             : {
      78         479 :         if (pthread_key_create(&tl_error_key, free) != 0)
      79           0 :                 return -1;
      80             :         return 0;
      81             : }
      82             : 
      83             : static struct tl_error_buf*
      84      219227 : get_tl_error_buf(void)
      85             : {
      86      219227 :         struct tl_error_buf *p = pthread_getspecific(tl_error_key);
      87      219228 :         if (p == NULL) {
      88        3749 :                 p = malloc(sizeof(*p));
      89        3749 :                 if (p == NULL)
      90             :                         return NULL;
      91        3749 :                 *p = (struct tl_error_buf) { .msg = {0} };
      92        3749 :                 pthread_setspecific(tl_error_key, p);
      93        3749 :                 struct tl_error_buf *second_attempt = pthread_getspecific(tl_error_key);
      94        3749 :                 assert(p == second_attempt /* maybe mnstr_init has not been called? */);
      95             :                 (void) second_attempt; // suppress warning if asserts disabled
      96             :         }
      97             :         return p;
      98             : }
      99             : 
     100             : #elif defined(WIN32)
     101             : 
     102             : static DWORD tl_error_key = 0;
     103             : 
     104             : static int
     105             : tl_error_init(void)
     106             : {
     107             :         DWORD key = TlsAlloc();
     108             :         if (key == TLS_OUT_OF_INDEXES)
     109             :                 return -1;
     110             :         else {
     111             :                 tl_error_key = key;
     112             :                 return 0;
     113             :         }
     114             : }
     115             : 
     116             : static struct tl_error_buf*
     117             : get_tl_error_buf(void)
     118             : {
     119             :         struct tl_error_buf *p = TlsGetValue(tl_error_key);
     120             : 
     121             :         if (p == NULL) {
     122             :                 if (GetLastError() != ERROR_SUCCESS)
     123             :                         return NULL; // something went terribly wrong
     124             : 
     125             :                 // otherwise, initialize
     126             :                 p = malloc(sizeof(*p));
     127             :                 if (p == NULL)
     128             :                         return NULL;
     129             :                 *p = (struct tl_error_buf) { .msg = 0 };
     130             :                 if (!TlsSetValue(tl_error_key, p)) {
     131             :                         free(p);
     132             :                         return NULL;
     133             :                 }
     134             : 
     135             :                 struct tl_error_buf *second_attempt = TlsGetValue(tl_error_key);
     136             :                 assert(p == second_attempt /* maybe mnstr_init has not been called? */);
     137             :                 (void) second_attempt; // suppress warning if asserts disabled
     138             :         }
     139             :         return p;
     140             : }
     141             : 
     142             : #else
     143             : 
     144             : #error "no pthreads and no Windows, don't know what to do"
     145             : 
     146             : #endif
     147             : 
     148             : static const char *mnstr_error_kind_description(mnstr_error_kind kind);
     149             : 
     150             : int
     151         689 : mnstr_init(void)
     152             : {
     153             :         static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
     154             : 
     155         689 :         if (ATOMIC_TAS(&inited))
     156             :                 return 0;
     157             : 
     158         479 :         if (tl_error_init()< 0)
     159           0 :                 return -1;
     160             : 
     161             : #ifdef NATIVE_WIN32
     162             :         WSADATA w;
     163             :         if (WSAStartup(0x0101, &w) != 0)
     164             :                 return -1;
     165             : #endif
     166             : 
     167             :         return 0;
     168             : }
     169             : 
     170             : const char *
     171           0 : mnstr_version(void)
     172             : {
     173           0 :         return STREAM_VERSION;
     174             : }
     175             : 
     176             : /* Read at most cnt elements of size elmsize from the stream.  Returns
     177             :  * the number of elements actually read or < 0 on failure. */
     178             : ssize_t
     179      929296 : mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     180             : {
     181      929296 :         if (s == NULL || buf == NULL)
     182             :                 return -1;
     183             : #ifdef STREAM_DEBUG
     184             :         fprintf(stderr, "read %s %zu %zu\n",
     185             :                 s->name ? s->name : "<unnamed>", elmsize, cnt);
     186             : #endif
     187      929296 :         assert(s->readonly);
     188      929296 :         if (s->errkind != MNSTR_NO__ERROR)
     189             :                 return -1;
     190      929296 :         return s->read(s, buf, elmsize, cnt);
     191             : }
     192             : 
     193             : 
     194             : /* Write cnt elements of size elmsize to the stream.  Returns the
     195             :  * number of elements actually written.  If elmsize or cnt equals zero,
     196             :  * returns cnt. */
     197             : ssize_t
     198    19363741 : mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     199             : {
     200    19363741 :         if (s == NULL || buf == NULL)
     201             :                 return -1;
     202             : #ifdef STREAM_DEBUG
     203             :         fprintf(stderr, "write %s %zu %zu\n",
     204             :                 s->name ? s->name : "<unnamed>", elmsize, cnt);
     205             : #endif
     206    19363741 :         assert(!s->readonly);
     207    19363741 :         if (s->errkind != MNSTR_NO__ERROR)
     208             :                 return -1;
     209    19363731 :         return s->write(s, buf, elmsize, cnt);
     210             : }
     211             : 
     212             : 
     213             : /* Read one line (seperated by \n) of at most maxcnt-1 characters from
     214             :  * the stream.  Returns the number of characters actually read,
     215             :  * includes the trailing \n; terminated by a NULL byte. */
     216             : ssize_t
     217      143134 : mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
     218             : {
     219             :         char *b = buf, *start = buf;
     220             : 
     221      143134 :         if (s == NULL || buf == NULL)
     222             :                 return -1;
     223             : #ifdef STREAM_DEBUG
     224             :         fprintf(stderr, "readline %s %zu\n",
     225             :                 s->name ? s->name : "<unnamed>", maxcnt);
     226             : #endif
     227      143134 :         assert(s->readonly);
     228      143134 :         if (s->errkind != MNSTR_NO__ERROR)
     229             :                 return -1;
     230      143134 :         if (maxcnt == 0)
     231             :                 return 0;
     232      143134 :         if (maxcnt == 1) {
     233           0 :                 *start = 0;
     234           0 :                 return 0;
     235             :         }
     236             :         for (;;) {
     237    21031362 :                 switch (s->read(s, start, 1, 1)) {
     238    21031068 :                 case 1:
     239             :                         /* successfully read a character,
     240             :                          * check whether it is the line
     241             :                          * separator and whether we have space
     242             :                          * left for more */
     243    21031068 :                         if (*start++ == '\n' || --maxcnt == 1) {
     244      142840 :                                 *start = 0;
     245      142840 :                                 return (ssize_t) (start - b);
     246             :                         }
     247             :                         break;
     248           0 :                 case -1:
     249             :                         /* error: if we didn't read anything yet,
     250             :                          * return the error, otherwise return what we
     251             :                          * have */
     252           0 :                         if (start == b)
     253             :                                 return -1;
     254             :                         /* fall through */
     255             :                 case 0:
     256             :                         /* end of file: return what we have */
     257         294 :                         *start = 0;
     258         294 :                         return (ssize_t) (start - b);
     259             :                 }
     260             :         }
     261             : }
     262             : 
     263             : 
     264             : void
     265        5371 : mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data)
     266             : {
     267        5371 :         if (s) {
     268        5371 :                 s->timeout = ms;
     269        5371 :                 s->timeout_func = func;
     270        5371 :                 s->timeout_data = data;
     271        5371 :                 if (s->update_timeout)
     272        5371 :                         s->update_timeout(s);
     273             :         }
     274        5371 : }
     275             : 
     276             : 
     277             : void
     278       14164 : mnstr_close(stream *s)
     279             : {
     280       14164 :         if (s) {
     281             : #ifdef STREAM_DEBUG
     282             :                 fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
     283             : #endif
     284       14164 :                 s->close(s);
     285             :         }
     286       14164 : }
     287             : 
     288             : 
     289             : void
     290         830 : mnstr_destroy(stream *s)
     291             : {
     292         830 :         if (s) {
     293             : #ifdef STREAM_DEBUG
     294             :                 fprintf(stderr, "destroy %s\n",
     295             :                         s->name ? s->name : "<unnamed>");
     296             : #endif
     297         826 :                 s->destroy(s);
     298             :         }
     299         830 : }
     300             : 
     301             : void
     302         158 : mnstr_va_set_error(stream *s, mnstr_error_kind kind, const char *fmt, va_list ap)
     303             : {
     304         158 :         if (s == NULL)
     305             :                 return;
     306             : 
     307         158 :         s->errkind = kind;
     308             : 
     309         158 :         if (kind == MNSTR_NO__ERROR) {
     310           0 :                 s->errmsg[0] = '\0';
     311           0 :                 return;
     312             :         }
     313             : 
     314         158 :         char *start = &s->errmsg[0];
     315         158 :         char *end = start + sizeof(s->errmsg);
     316         158 :         if (s->name != NULL)
     317         158 :                 start += snprintf(start, end - start, "stream %s: ", s->name);
     318             : 
     319         158 :         if (start >= end - 1)
     320             :                 return;
     321             : 
     322         158 :         if (fmt == NULL)
     323          17 :                 fmt = mnstr_error_kind_description(kind);
     324             : 
     325             :         // Complicated pointer dance in order to shut up 'might be a candidate
     326             :         // for gnu_printf format attribute' warning from gcc.
     327             :         // It's really eager to trace where the vsnprintf ends up, we need
     328             :         // the ? : to throw it off its scent.
     329             :         // Similarly, the parentheses around the 1 serve to suppress a Clang
     330             :         // warning about dead code (the atoi).
     331             :         void *f1 = (1) ? (void*)&vsnprintf : (void*)&atoi;
     332             :         int (*f)(char *str, size_t size, const char *format, va_list ap) = f1;
     333         158 :         f(start, end - start, fmt, ap);
     334             : }
     335             : 
     336             : void
     337          19 : mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
     338             : {
     339             :         va_list ap;
     340          19 :         va_start(ap, fmt);
     341          19 :         mnstr_va_set_error(s, kind, fmt, ap);
     342          19 :         va_end(ap);
     343          19 : }
     344             : 
     345             : static size_t my_strerror_r(int error_nr, char *buf, size_t len);
     346             : 
     347             : void
     348         139 : mnstr_set_error_errno(stream *s, mnstr_error_kind kind, const char *fmt, ...)
     349             : {
     350             :         va_list ap;
     351         139 :         va_start(ap, fmt);
     352         139 :         mnstr_va_set_error(s, kind, fmt, ap);
     353         139 :         va_end(ap);
     354             : 
     355             :         /* append as much as fits of the system error message */
     356         139 :         char *start = &s->errmsg[0] + strlen(s->errmsg);
     357         139 :         char *end = &s->errmsg[0] + sizeof(s->errmsg);
     358         139 :         if (end - start >= 3) {
     359         139 :                 start = stpcpy(start, ": ");
     360         139 :                 start += my_strerror_r(errno, start, end - start);
     361             :         }
     362         139 : }
     363             : 
     364             : 
     365             : void
     366      219226 : mnstr_set_open_error(const char *name, int errnr, const char *fmt, ...)
     367             : {
     368             :         va_list ap;
     369             : 
     370      219226 :         struct tl_error_buf *buf = get_tl_error_buf();
     371      219227 :         if (buf == NULL)
     372      219227 :                 return; // hopeless
     373             : 
     374      219227 :         if (errnr == 0 && fmt == NULL) {
     375      219139 :                 buf->msg[0] = '\0';
     376      219139 :                 return;
     377             :         }
     378             : 
     379          88 :         char *start = &buf->msg[0];
     380          88 :         char *end = start + sizeof(buf->msg);
     381             : 
     382          88 :         if (name != NULL)
     383          88 :                 start += snprintf(start, end - start, "when opening %s: ", name);
     384          88 :         if (start >= end - 1)
     385             :                 return;
     386             : 
     387          88 :         if (fmt != NULL) {
     388          88 :                 va_start(ap, fmt);
     389          88 :                 start += vsnprintf(start, end - start, fmt, ap);
     390          88 :                 va_end(ap);
     391             :         }
     392          88 :         if (start >= end - 1)
     393             :                 return;
     394             : 
     395          88 :         if (errnr != 0 && end - start >= 3) {
     396          88 :                 start = stpcpy(start, ": ");
     397          88 :                 start += my_strerror_r(errno, start, end - start);
     398             :         }
     399             :         if (start >= end - 1)
     400          88 :                 return;
     401             : }
     402             : 
     403             : static size_t
     404         227 : my_strerror_r(int error_nr, char *buf, size_t buflen)
     405             : {
     406             :         // Three cases:
     407             :         // 1. no strerror_r
     408             :         // 2. gnu strerror_r (returns char* and does not always fill buffer)
     409             :         // 3. xsi strerror_r (returns int and always fills the buffer)
     410             :         char *to_move;
     411             : #ifndef HAVE_STRERROR_R
     412             :         // Hope for the best
     413             :         to_move = strerror(error_nr);
     414             : #elif !defined(_GNU_SOURCE) || !_GNU_SOURCE
     415             :         // standard strerror_r always writes to buf
     416             :         int result_code = strerror_r(error_nr, buf, buflen);
     417             :         if (result_code == 0)
     418             :                 to_move = NULL;
     419             :         else
     420             :                 to_move = "<failed to retrieve error message>";
     421             : #else
     422             :         // gnu strerror_r sometimes only returns static string, needs copy
     423         227 :         to_move = strerror_r(error_nr, buf, buflen);
     424             : #endif
     425         227 :         if (to_move != NULL) {
     426             :                 // move to buffer
     427         227 :                 size_t size = strlen(to_move) + 1;
     428         227 :                 assert(size <= buflen);
     429             :                 // strerror_r may have return a pointer to/into the buffer
     430         227 :                 memmove(buf, to_move, size);
     431         227 :                 return size - 1;
     432             :         } else {
     433           0 :                 return strlen(buf);
     434             :         }
     435             : }
     436             : 
     437             : 
     438             : 
     439         156 : void mnstr_copy_error(stream *dst, stream *src)
     440             : {
     441         156 :         dst->errkind = src->errkind;
     442         156 :         memcpy(dst->errmsg, src->errmsg, sizeof(dst->errmsg));
     443         156 : }
     444             : 
     445             : char *
     446           0 : mnstr_error(const stream *s)
     447             : {
     448           0 :         const char *msg = mnstr_peek_error(s);
     449           0 :         if (msg != NULL)
     450           0 :                 return strdup(msg);
     451             :         else
     452             :                 return NULL;
     453             : }
     454             : 
     455             : const char*
     456           3 : mnstr_peek_error(const stream *s)
     457             : {
     458           3 :         if (s == NULL) {
     459           1 :                 struct tl_error_buf *b = get_tl_error_buf();
     460           1 :                 if (b != NULL)
     461           1 :                         return b->msg;
     462             :                 else
     463             :                         return "unknown error";
     464             :         }
     465             : 
     466           2 :         if (s->errkind == MNSTR_NO__ERROR)
     467             :                 return "no error";
     468             : 
     469           2 :         if (s->errmsg[0] != '\0')
     470           2 :                 return s->errmsg;
     471             : 
     472           0 :         return mnstr_error_kind_description(s->errkind);
     473             : }
     474             : 
     475             : static const char *
     476          17 : mnstr_error_kind_description(mnstr_error_kind kind)
     477             : {
     478          17 :         switch (kind) {
     479             :         case MNSTR_NO__ERROR:
     480             :                 /* unreachable */
     481           0 :                 assert(0);
     482             :                 return NULL;
     483             :         case MNSTR_OPEN_ERROR:
     484             :                 return "error could not open";
     485           0 :         case MNSTR_READ_ERROR:
     486           0 :                 return "error reading";
     487           0 :         case MNSTR_WRITE_ERROR:
     488           0 :                 return "error writing";
     489          17 :         case MNSTR_TIMEOUT:
     490          17 :                 return "timeout";
     491           0 :         case MNSTR_UNEXPECTED_EOF:
     492           0 :                 return "timeout";
     493             :         }
     494             : 
     495           0 :         return "Unknown error";
     496             : }
     497             : 
     498             : /* flush buffer, return 0 on success, non-zero on failure */
     499             : int
     500      634594 : mnstr_flush(stream *s, mnstr_flush_level flush_level)
     501             : {
     502      634594 :         if (s == NULL)
     503             :                 return -1;
     504             : #ifdef STREAM_DEBUG
     505             :         fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
     506             : #endif
     507      634594 :         assert(!s->readonly);
     508      634594 :         if (s->errkind != MNSTR_NO__ERROR)
     509             :                 return -1;
     510      634594 :         if (s->flush)
     511      634594 :                 return s->flush(s, flush_level);
     512             :         return 0;
     513             : }
     514             : 
     515             : 
     516             : /* sync file to disk, return 0 on success, non-zero on failure */
     517             : int
     518          51 : mnstr_fsync(stream *s)
     519             : {
     520          51 :         if (s == NULL)
     521             :                 return -1;
     522             : #ifdef STREAM_DEBUG
     523             :         fprintf(stderr, "fsync %s (%d)\n",
     524             :                 s->name ? s->name : "<unnamed>", s->errnr);
     525             : #endif
     526          51 :         assert(!s->readonly);
     527          51 :         if (s->errkind != MNSTR_NO__ERROR)
     528             :                 return -1;
     529          51 :         if (s->fsync)
     530          51 :                 return s->fsync(s);
     531             :         return 0;
     532             : }
     533             : 
     534             : 
     535             : int
     536           0 : mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
     537             : {
     538           0 :         if (s == NULL || p == NULL)
     539             :                 return -1;
     540             : #ifdef STREAM_DEBUG
     541             :         fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
     542             : #endif
     543           0 :         if (s->errkind != MNSTR_NO__ERROR)
     544             :                 return -1;
     545           0 :         if (s->fgetpos)
     546           0 :                 return s->fgetpos(s, p);
     547             :         return 0;
     548             : }
     549             : 
     550             : 
     551             : int
     552           0 : mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
     553             : {
     554           0 :         if (s == NULL)
     555             :                 return -1;
     556             : #ifdef STREAM_DEBUG
     557             :         fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
     558             : #endif
     559           0 :         if (s->errkind != MNSTR_NO__ERROR)
     560             :                 return -1;
     561           0 :         if (s->fsetpos)
     562           0 :                 return s->fsetpos(s, p);
     563             :         return 0;
     564             : }
     565             : 
     566             : 
     567             : int
     568    12115233 : mnstr_isalive(const stream *s)
     569             : {
     570    12115233 :         if (s == NULL)
     571             :                 return 0;
     572    12115233 :         if (s->errkind != MNSTR_NO__ERROR)
     573             :                 return -1;
     574    12115233 :         if (s->isalive)
     575    11278960 :                 return s->isalive(s);
     576             :         return 1;
     577             : }
     578             : 
     579             : 
     580             : bool
     581      182765 : mnstr_eof(const stream *s)
     582             : {
     583      182765 :         return s->eof;
     584             : }
     585             : 
     586             : char *
     587       25284 : mnstr_name(const stream *s)
     588             : {
     589       25284 :         if (s == NULL)
     590             :                 return "connection terminated";
     591       25284 :         return s->name;
     592             : }
     593             : 
     594             : 
     595             : mnstr_error_kind
     596     2212729 : mnstr_errnr(const stream *s)
     597             : {
     598     2212729 :         if (s == NULL)
     599             :                 return MNSTR_READ_ERROR;
     600     2212729 :         return s->errkind;
     601             : }
     602             : 
     603             : const char *
     604           0 : mnstr_error_kind_name(mnstr_error_kind k)
     605             : {
     606           0 :         switch (k) {
     607             :         case MNSTR_NO__ERROR:
     608             :                 return "MNSTR_NO__ERROR";
     609           0 :         case MNSTR_OPEN_ERROR:
     610           0 :                 return "MNSTR_OPEN_ERROR";
     611           0 :         case MNSTR_READ_ERROR:
     612           0 :                 return "MNSTR_READ_ERROR";
     613           0 :         case MNSTR_WRITE_ERROR:
     614           0 :                 return "MNSTR_WRITE_ERROR";
     615           0 :         case MNSTR_TIMEOUT:
     616           0 :                 return "MNSTR_TIMEOUT";
     617           0 :         default:
     618           0 :                 return "<UNKNOWN_ERROR>";
     619             :         }
     620             : 
     621             : }
     622             : void
     623           2 : mnstr_clearerr(stream *s)
     624             : {
     625           2 :         if (s != NULL) {
     626           2 :                 s->errkind = MNSTR_NO__ERROR;
     627           2 :                 s->errmsg[0] = '\0';
     628           2 :                 if (s->clrerr)
     629           0 :                         s->clrerr(s);
     630             :         }
     631           2 : }
     632             : 
     633             : 
     634             : bool
     635           0 : mnstr_isbinary(const stream *s)
     636             : {
     637           0 :         if (s == NULL)
     638             :                 return false;
     639           0 :         return s->binary;
     640             : }
     641             : 
     642             : 
     643             : bool
     644           0 : mnstr_get_swapbytes(const stream *s)
     645             : {
     646           0 :         if (s == NULL)
     647             :                 return 0;
     648           0 :         return s->swapbytes;
     649             : }
     650             : 
     651             : 
     652             : /* set stream to big-endian/little-endian byte order; the default is
     653             :  * native byte order */
     654             : void
     655        6731 : mnstr_set_bigendian(stream *s, bool bigendian)
     656             : {
     657        6731 :         if (s == NULL)
     658             :                 return;
     659             : #ifdef STREAM_DEBUG
     660             :         fprintf(stderr, "mnstr_set_bigendian %s %s\n",
     661             :                 s->name ? s->name : "<unnamed>",
     662             :                 swapbytes ? "true" : "false");
     663             : #endif
     664        6731 :         assert(s->readonly);
     665        6731 :         s->binary = true;
     666             : #ifdef WORDS_BIGENDIAN
     667             :         s->swapbytes = !bigendian;
     668             : #else
     669        6731 :         s->swapbytes = bigendian;
     670             : #endif
     671             : }
     672             : 
     673             : 
     674             : void
     675       34659 : close_stream(stream *s)
     676             : {
     677       34659 :         if (s) {
     678       34307 :                 if (s->close)
     679       34307 :                         s->close(s);
     680       34307 :                 if (s->destroy)
     681       34307 :                         s->destroy(s);
     682             :         }
     683       34659 : }
     684             : 
     685             : 
     686             : void
     687      219092 : destroy_stream(stream *s)
     688             : {
     689      219092 :         if (s->name)
     690      219092 :                 free(s->name);
     691      219092 :         free(s);
     692      219092 : }
     693             : 
     694             : 
     695             : stream *
     696      219138 : create_stream(const char *name)
     697             : {
     698             :         stream *s;
     699             : 
     700      219138 :         if (name == NULL) {
     701           0 :                 mnstr_set_open_error(NULL, 0, "internal error: name not set");
     702           0 :                 return NULL;
     703             :         }
     704      219138 :         if ((s = (stream *) malloc(sizeof(*s))) == NULL) {
     705           0 :                 mnstr_set_open_error(name, errno, "malloc failed");
     706           0 :                 return NULL;
     707             :         }
     708      219138 :         *s = (stream) {
     709             :                 .swapbytes = false,
     710             :                 .readonly = true,
     711             :                 .isutf8 = false,        /* not known for sure */
     712             :                 .binary = false,
     713             :                 .eof = false,
     714      219138 :                 .name = strdup(name),
     715             :                 .errkind = MNSTR_NO__ERROR,
     716             :                 .errmsg = {0},
     717             :                 .destroy = destroy_stream,
     718             :         };
     719      219138 :         if(s->name == NULL) {
     720           0 :                 free(s);
     721           0 :                 mnstr_set_open_error(name, errno, "malloc failed");
     722           0 :                 return NULL;
     723             :         }
     724             : #ifdef STREAM_DEBUG
     725             :         fprintf(stderr, "create_stream %s -> %p\n",
     726             :                 name ? name : "<unnamed>", s);
     727             : #endif
     728      219138 :         mnstr_set_open_error(NULL, 0, NULL); // clear the error
     729      219139 :         return s;
     730             : }
     731             : 
     732             : 
     733             : static ssize_t
     734           0 : wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     735             : {
     736           0 :         ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt);
     737           0 :         s->eof |= s->inner->eof;
     738           0 :         return ret;
     739             : }
     740             : 
     741             : 
     742             : static ssize_t
     743           0 : wrapper_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     744             : {
     745           0 :         return s->inner->write(s->inner, buf, elmsize, cnt);
     746             : }
     747             : 
     748             : 
     749             : static void
     750           0 : wrapper_close(stream *s)
     751             : {
     752           0 :         s->inner->close(s->inner);
     753           0 : }
     754             : 
     755             : 
     756             : static void
     757           0 : wrapper_clrerr(stream *s)
     758             : {
     759           0 :         s->inner->clrerr(s->inner);
     760           0 : }
     761             : 
     762             : 
     763             : static void
     764           0 : wrapper_destroy(stream *s)
     765             : {
     766           0 :         s->inner->destroy(s->inner);
     767           0 :         destroy_stream(s);
     768           0 : }
     769             : 
     770             : 
     771             : static int
     772           0 : wrapper_flush(stream *s, mnstr_flush_level flush_level)
     773             : {
     774           0 :         return s->inner->flush(s->inner, flush_level);
     775             : }
     776             : 
     777             : 
     778             : static int
     779           4 : wrapper_fsync(stream *s)
     780             : {
     781           4 :         return s->inner->fsync(s->inner);
     782             : }
     783             : 
     784             : 
     785             : static int
     786           0 : wrapper_fgetpos(stream *restrict s, fpos_t *restrict p)
     787             : {
     788           0 :         return s->inner->fgetpos(s->inner, p);
     789             : }
     790             : 
     791             : 
     792             : static int
     793           0 : wrapper_fsetpos(stream *restrict s, fpos_t *restrict p)
     794             : {
     795           0 :         return s->inner->fsetpos(s->inner, p);
     796             : }
     797             : 
     798             : 
     799             : static void
     800        5371 : wrapper_update_timeout(stream *s)
     801             : {
     802        5371 :         s->inner->timeout = s->timeout;
     803        5371 :         s->inner->timeout_func = s->timeout_func;
     804        5371 :         s->inner->timeout_data = s->timeout_data;
     805        5371 :         s->inner->update_timeout(s->inner);
     806        5371 : }
     807             : 
     808             : 
     809             : static int
     810    11279243 : wrapper_isalive(const stream *s)
     811             : {
     812    11279243 :         return s->inner->isalive(s->inner);
     813             : }
     814             : 
     815             : 
     816             : stream *
     817       14180 : create_wrapper_stream(const char *name, stream *inner)
     818             : {
     819       14180 :         if (inner == NULL)
     820             :                 return NULL;
     821       14180 :         if (name == NULL)
     822       14176 :                 name = inner->name;
     823       14180 :         stream *s = create_stream(name);
     824       14180 :         if (s == NULL)
     825             :                 return NULL;
     826             : 
     827             : 
     828       14180 :         s->swapbytes = inner->swapbytes;
     829       14180 :         s->readonly = inner->readonly;
     830       14180 :         s->isutf8 = inner->isutf8;
     831       14180 :         s->binary = inner->binary;
     832       14180 :         s->timeout = inner->timeout;
     833       14180 :         s->inner = inner;
     834             : 
     835       14180 :         s->read = inner->read == NULL ? NULL : wrapper_read;
     836       14180 :         s->write = inner->write == NULL ? NULL : wrapper_write;
     837       14180 :         s->close = inner->close == NULL ? NULL : wrapper_close;
     838       14180 :         s->clrerr = inner->clrerr == NULL ? NULL : wrapper_clrerr;
     839       14180 :         s->destroy = wrapper_destroy;
     840       14180 :         s->flush = inner->flush == NULL ? NULL : wrapper_flush;
     841       14180 :         s->fsync = inner->fsync == NULL ? NULL : wrapper_fsync;
     842       14180 :         s->fgetpos = inner->fgetpos == NULL ? NULL : wrapper_fgetpos;
     843       14180 :         s->fsetpos = inner->fsetpos == NULL ? NULL : wrapper_fsetpos;
     844       14180 :         s->isalive = inner->isalive == NULL ? NULL : wrapper_isalive;
     845       14180 :         s->update_timeout = inner->update_timeout == NULL ? NULL : wrapper_update_timeout;
     846             : 
     847       14180 :         return s;
     848             : }
     849             : 
     850             : /* ------------------------------------------------------------------ */
     851             : /* streams working on a disk file, compressed or not */
     852             : 
     853             : stream *
     854       12791 : open_rstream(const char *filename)
     855             : {
     856       12791 :         if (filename == NULL)
     857             :                 return NULL;
     858             : #ifdef STREAM_DEBUG
     859             :         fprintf(stderr, "open_rstream %s\n", filename);
     860             : #endif
     861             : 
     862       12791 :         stream *s = open_stream(filename, "rb");
     863       12791 :         if (s == NULL)
     864             :                 return NULL;
     865             : 
     866       12703 :         stream *c = compressed_stream(s, 0);
     867       12703 :         if (c == NULL)
     868           0 :                 close_stream(s);
     869             : 
     870             :         return c;
     871             : }
     872             : 
     873             : stream *
     874       12581 : open_wstream(const char *filename)
     875             : {
     876       12581 :         if (filename == NULL)
     877             :                 return NULL;
     878             : #ifdef STREAM_DEBUG
     879             :         fprintf(stderr, "open_wstream %s\n", filename);
     880             : #endif
     881             : 
     882       12581 :         stream *s = open_stream(filename, "wb");
     883       12581 :         if (s == NULL)
     884             :                 return NULL;
     885             : 
     886       12581 :         stream *c = compressed_stream(s, 0);
     887       12581 :         if (c == NULL) {
     888           0 :                 close_stream(s);
     889           0 :                 file_remove(filename);
     890             :         }
     891             : 
     892             :         return c;
     893             : }
     894             : 
     895             : stream *
     896         280 : open_rastream(const char *filename)
     897             : {
     898         280 :         if (filename == NULL)
     899             :                 return NULL;
     900             : #ifdef STREAM_DEBUG
     901             :         fprintf(stderr, "open_rastream %s\n", filename);
     902             : #endif
     903         280 :         stream *s = open_rstream(filename);
     904         280 :         if (s == NULL)
     905             :                 return NULL;
     906             : 
     907         279 :         stream *t = create_text_stream(s);
     908         279 :         if (t == NULL)
     909           0 :                 close_stream(s);
     910             : 
     911             :         return t;
     912             : }
     913             : 
     914             : stream *
     915           5 : open_wastream(const char *filename)
     916             : {
     917           5 :         if (filename == NULL)
     918             :                 return NULL;
     919             : #ifdef STREAM_DEBUG
     920             :         fprintf(stderr, "open_wastream %s\n", filename);
     921             : #endif
     922           5 :         stream *s = open_wstream(filename);
     923           5 :         if (s == NULL)
     924             :                 return NULL;
     925             : 
     926           5 :         stream *t = create_text_stream(s);
     927           5 :         if (t == NULL) {
     928           0 :                 close_stream(s);
     929           0 :                 file_remove(filename);
     930             :         }
     931             : 
     932             :         return t;
     933             : }
     934             : 
     935             : 
     936             : /* put here because it depends on both bs_read AND bs2_read */
     937             : bool
     938      191518 : isa_block_stream(const stream *s)
     939             : {
     940      191518 :         assert(s != NULL);
     941      383036 :         return s &&
     942      191518 :                 ((s->read == bs_read ||
     943        1807 :                   s->write == bs_write) ||
     944        1807 :                  (s->read == bs2_read ||
     945             :                   s->write == bs2_write));
     946             : }
     947             : 
     948             : 
     949             : /* Put here because I need to think very carefully about this
     950             :  * mnstr_read(,, 0, 0). What would that mean?
     951             :  */
     952             : ssize_t
     953        6870 : mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     954             : {
     955             :         ssize_t len = 0;
     956        6870 :         char x = 0;
     957             : 
     958        6870 :         if (s == NULL || buf == NULL)
     959             :                 return -1;
     960        6870 :         assert(s->read == bs_read || s->write == bs_write);
     961       13740 :         if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
     962        6870 :             mnstr_read(s, &x, 0, 0) < 0 /* read prompt */  ||
     963        6870 :             x > 0)
     964           0 :                 return -1;
     965             :         return len;
     966             : }

Generated by: LCOV version 1.14