Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 :
10 : /* stream
11 : * ======
12 : * Niels Nes
13 : * An simple interface to streams
14 : *
15 : * Processing files, streams, and sockets is quite different on Linux
16 : * and Windows platforms. To improve portability between both, we advise
17 : * to replace the stdio actions with the stream functionality provided
18 : * here.
19 : *
20 : * This interface can also be used to open 'non compressed, gzipped,
21 : * bz2zipped' data files and sockets. Using this interface one could
22 : * easily switch between the various underlying storage types.
23 : *
24 : * buffered streams
25 : * ----------------
26 : *
27 : * The bstream (or buffered_stream) can be used for efficient reading of
28 : * a stream. Reading can be done in large chunks and access can be done
29 : * in smaller bits, by directly accessing the underlying buffer.
30 : *
31 : * Beware that a flush on a buffered stream emits an empty block to
32 : * synchronize with the other side, telling it has reached the end of
33 : * the sequence and can close its descriptors.
34 : *
35 : * bstream functions
36 : * -----------------
37 : *
38 : * The bstream_create gets a read stream (rs) as input and the initial
39 : * chunk size and creates a buffered stream from this. A spare byte is
40 : * kept at the end of the buffer. The bstream_read will at least read
41 : * the next 'size' bytes. If the not read data (aka pos < len) together
42 : * with the new data will not fit in the current buffer it is resized.
43 : * The spare byte is kept.
44 : *
45 : * tee streams
46 : * -----------
47 : *
48 : * A tee stream is a write stream that duplicates all output to two
49 : * write streams of the same type (txt/bin).
50 : */
51 :
52 : /* Generic stream handling code such as init and close */
53 :
54 : #include "monetdb_config.h"
55 : #include "stream.h"
56 : #include "stream_internal.h"
57 : #include <stdio.h>
58 :
59 :
60 : #ifdef HAVE_PTHREAD_H
61 : #include <pthread.h>
62 : #endif
63 :
64 : struct tl_error_buf {
65 : char msg[1024];
66 : };
67 :
68 : static int tl_error_init(void);
69 : static struct tl_error_buf *get_tl_error_buf(void);
70 :
71 : #ifdef HAVE_PTHREAD_H
72 :
73 : static pthread_key_t tl_error_key;
74 :
75 : static int
76 474 : tl_error_init(void)
77 : {
78 474 : if (pthread_key_create(&tl_error_key, free) != 0)
79 0 : return -1;
80 : return 0;
81 : }
82 :
83 : static struct tl_error_buf*
84 217330 : get_tl_error_buf(void)
85 : {
86 217330 : struct tl_error_buf *p = pthread_getspecific(tl_error_key);
87 217330 : if (p == NULL) {
88 3251 : p = malloc(sizeof(*p));
89 3251 : if (p == NULL)
90 : return NULL;
91 3251 : *p = (struct tl_error_buf) { .msg = {0} };
92 3251 : pthread_setspecific(tl_error_key, p);
93 3251 : struct tl_error_buf *second_attempt = pthread_getspecific(tl_error_key);
94 3251 : assert(p == second_attempt /* maybe mnstr_init has not been called? */);
95 : (void) second_attempt; // suppress warning if asserts disabled
96 : }
97 : return p;
98 : }
99 :
100 : #elif defined(WIN32)
101 :
102 : static DWORD tl_error_key = 0;
103 :
104 : static int
105 : tl_error_init(void)
106 : {
107 : DWORD key = TlsAlloc();
108 : if (key == TLS_OUT_OF_INDEXES)
109 : return -1;
110 : else {
111 : tl_error_key = key;
112 : return 0;
113 : }
114 : }
115 :
116 : static struct tl_error_buf*
117 : get_tl_error_buf(void)
118 : {
119 : struct tl_error_buf *p = TlsGetValue(tl_error_key);
120 :
121 : if (p == NULL) {
122 : if (GetLastError() != ERROR_SUCCESS)
123 : return NULL; // something went terribly wrong
124 :
125 : // otherwise, initialize
126 : p = malloc(sizeof(*p));
127 : if (p == NULL)
128 : return NULL;
129 : *p = (struct tl_error_buf) { .msg = 0 };
130 : if (!TlsSetValue(tl_error_key, p)) {
131 : free(p);
132 : return NULL;
133 : }
134 :
135 : struct tl_error_buf *second_attempt = TlsGetValue(tl_error_key);
136 : assert(p == second_attempt /* maybe mnstr_init has not been called? */);
137 : (void) second_attempt; // suppress warning if asserts disabled
138 : }
139 : return p;
140 : }
141 :
142 : #else
143 :
144 : #error "no pthreads and no Windows, don't know what to do"
145 :
146 : #endif
147 :
148 : static const char *mnstr_error_kind_description(mnstr_error_kind kind);
149 :
150 : int
151 681 : mnstr_init(void)
152 : {
153 : static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
154 :
155 681 : if (ATOMIC_TAS(&inited))
156 : return 0;
157 :
158 474 : if (tl_error_init()< 0)
159 0 : return -1;
160 :
161 : #ifdef NATIVE_WIN32
162 : WSADATA w;
163 : if (WSAStartup(0x0101, &w) != 0)
164 : return -1;
165 : #endif
166 :
167 : return 0;
168 : }
169 :
170 : const char *
171 0 : mnstr_version(void)
172 : {
173 0 : return STREAM_VERSION;
174 : }
175 :
176 : /* Read at most cnt elements of size elmsize from the stream. Returns
177 : * the number of elements actually read or < 0 on failure. */
178 : ssize_t
179 906303 : mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
180 : {
181 906303 : if (s == NULL || buf == NULL)
182 : return -1;
183 : #ifdef STREAM_DEBUG
184 : fprintf(stderr, "read %s %zu %zu\n",
185 : s->name ? s->name : "<unnamed>", elmsize, cnt);
186 : #endif
187 906303 : assert(s->readonly);
188 906303 : if (s->errkind != MNSTR_NO__ERROR)
189 : return -1;
190 906303 : return s->read(s, buf, elmsize, cnt);
191 : }
192 :
193 :
194 : /* Write cnt elements of size elmsize to the stream. Returns the
195 : * number of elements actually written. If elmsize or cnt equals zero,
196 : * returns cnt. */
197 : ssize_t
198 18098805 : mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
199 : {
200 18098805 : if (s == NULL || buf == NULL)
201 : return -1;
202 : #ifdef STREAM_DEBUG
203 : fprintf(stderr, "write %s %zu %zu\n",
204 : s->name ? s->name : "<unnamed>", elmsize, cnt);
205 : #endif
206 18098805 : assert(!s->readonly);
207 18098805 : if (s->errkind != MNSTR_NO__ERROR)
208 : return -1;
209 18098795 : return s->write(s, buf, elmsize, cnt);
210 : }
211 :
212 :
213 : /* Read one line (seperated by \n) of at most maxcnt-1 characters from
214 : * the stream. Returns the number of characters actually read,
215 : * includes the trailing \n; terminated by a NULL byte. */
216 : ssize_t
217 141553 : mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
218 : {
219 : char *b = buf, *start = buf;
220 :
221 141553 : if (s == NULL || buf == NULL)
222 : return -1;
223 : #ifdef STREAM_DEBUG
224 : fprintf(stderr, "readline %s %zu\n",
225 : s->name ? s->name : "<unnamed>", maxcnt);
226 : #endif
227 141553 : assert(s->readonly);
228 141553 : if (s->errkind != MNSTR_NO__ERROR)
229 : return -1;
230 141553 : if (maxcnt == 0)
231 : return 0;
232 141553 : if (maxcnt == 1) {
233 0 : *start = 0;
234 0 : return 0;
235 : }
236 : for (;;) {
237 20898074 : switch (s->read(s, start, 1, 1)) {
238 20897783 : case 1:
239 : /* successfully read a character,
240 : * check whether it is the line
241 : * separator and whether we have space
242 : * left for more */
243 20897783 : if (*start++ == '\n' || --maxcnt == 1) {
244 141262 : *start = 0;
245 141262 : return (ssize_t) (start - b);
246 : }
247 : break;
248 0 : case -1:
249 : /* error: if we didn't read anything yet,
250 : * return the error, otherwise return what we
251 : * have */
252 0 : if (start == b)
253 : return -1;
254 : /* fall through */
255 : case 0:
256 : /* end of file: return what we have */
257 291 : *start = 0;
258 291 : return (ssize_t) (start - b);
259 : }
260 : }
261 : }
262 :
263 :
264 : void
265 4342 : mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data)
266 : {
267 4342 : if (s) {
268 4342 : s->timeout = ms;
269 4342 : s->timeout_func = func;
270 4342 : s->timeout_data = data;
271 4342 : if (s->update_timeout)
272 4342 : s->update_timeout(s);
273 : }
274 4342 : }
275 :
276 :
277 : void
278 13940 : mnstr_close(stream *s)
279 : {
280 13940 : if (s) {
281 : #ifdef STREAM_DEBUG
282 : fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
283 : #endif
284 13940 : s->close(s);
285 : }
286 13940 : }
287 :
288 :
289 : void
290 826 : mnstr_destroy(stream *s)
291 : {
292 826 : if (s) {
293 : #ifdef STREAM_DEBUG
294 : fprintf(stderr, "destroy %s\n",
295 : s->name ? s->name : "<unnamed>");
296 : #endif
297 822 : s->destroy(s);
298 : }
299 826 : }
300 :
301 : void
302 156 : mnstr_va_set_error(stream *s, mnstr_error_kind kind, const char *fmt, va_list ap)
303 : {
304 156 : if (s == NULL)
305 : return;
306 :
307 156 : s->errkind = kind;
308 :
309 156 : if (kind == MNSTR_NO__ERROR) {
310 0 : s->errmsg[0] = '\0';
311 0 : return;
312 : }
313 :
314 156 : char *start = &s->errmsg[0];
315 156 : char *end = start + sizeof(s->errmsg);
316 156 : if (s->name != NULL)
317 156 : start += snprintf(start, end - start, "stream %s: ", s->name);
318 :
319 156 : if (start >= end - 1)
320 : return;
321 :
322 156 : if (fmt == NULL)
323 17 : fmt = mnstr_error_kind_description(kind);
324 :
325 : // Complicated pointer dance in order to shut up 'might be a candidate
326 : // for gnu_printf format attribute' warning from gcc.
327 : // It's really eager to trace where the vsnprintf ends up, we need
328 : // the ? : to throw it off its scent.
329 : // Similarly, the parentheses around the 1 serve to suppress a Clang
330 : // warning about dead code (the atoi).
331 : void *f1 = (1) ? (void*)&vsnprintf : (void*)&atoi;
332 : int (*f)(char *str, size_t size, const char *format, va_list ap) = f1;
333 156 : f(start, end - start, fmt, ap);
334 : }
335 :
336 : void
337 19 : mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
338 : {
339 : va_list ap;
340 19 : va_start(ap, fmt);
341 19 : mnstr_va_set_error(s, kind, fmt, ap);
342 19 : va_end(ap);
343 19 : }
344 :
345 : static size_t my_strerror_r(int error_nr, char *buf, size_t len);
346 :
347 : void
348 137 : mnstr_set_error_errno(stream *s, mnstr_error_kind kind, const char *fmt, ...)
349 : {
350 : va_list ap;
351 137 : va_start(ap, fmt);
352 137 : mnstr_va_set_error(s, kind, fmt, ap);
353 137 : va_end(ap);
354 :
355 : /* append as much as fits of the system error message */
356 137 : char *start = &s->errmsg[0] + strlen(s->errmsg);
357 137 : char *end = &s->errmsg[0] + sizeof(s->errmsg);
358 137 : if (end - start >= 3) {
359 137 : start = stpcpy(start, ": ");
360 137 : start += my_strerror_r(errno, start, end - start);
361 : }
362 137 : }
363 :
364 :
365 : void
366 217329 : mnstr_set_open_error(const char *name, int errnr, const char *fmt, ...)
367 : {
368 : va_list ap;
369 :
370 217329 : struct tl_error_buf *buf = get_tl_error_buf();
371 217329 : if (buf == NULL)
372 217329 : return; // hopeless
373 :
374 217329 : if (errnr == 0 && fmt == NULL) {
375 217243 : buf->msg[0] = '\0';
376 217243 : return;
377 : }
378 :
379 86 : char *start = &buf->msg[0];
380 86 : char *end = start + sizeof(buf->msg);
381 :
382 86 : if (name != NULL)
383 86 : start += snprintf(start, end - start, "when opening %s: ", name);
384 86 : if (start >= end - 1)
385 : return;
386 :
387 86 : if (fmt != NULL) {
388 86 : va_start(ap, fmt);
389 86 : start += vsnprintf(start, end - start, fmt, ap);
390 86 : va_end(ap);
391 : }
392 86 : if (start >= end - 1)
393 : return;
394 :
395 86 : if (errnr != 0 && end - start >= 3) {
396 86 : start = stpcpy(start, ": ");
397 86 : start += my_strerror_r(errno, start, end - start);
398 : }
399 : if (start >= end - 1)
400 86 : return;
401 : }
402 :
403 : static size_t
404 223 : my_strerror_r(int error_nr, char *buf, size_t buflen)
405 : {
406 : // Three cases:
407 : // 1. no strerror_r
408 : // 2. gnu strerror_r (returns char* and does not always fill buffer)
409 : // 3. xsi strerror_r (returns int and always fills the buffer)
410 : char *to_move;
411 : #ifndef HAVE_STRERROR_R
412 : // Hope for the best
413 : to_move = strerror(error_nr);
414 : #elif !defined(_GNU_SOURCE) || !_GNU_SOURCE
415 : // standard strerror_r always writes to buf
416 : int result_code = strerror_r(error_nr, buf, buflen);
417 : if (result_code == 0)
418 : to_move = NULL;
419 : else
420 : to_move = "<failed to retrieve error message>";
421 : #else
422 : // gnu strerror_r sometimes only returns static string, needs copy
423 223 : to_move = strerror_r(error_nr, buf, buflen);
424 : #endif
425 223 : if (to_move != NULL) {
426 : // move to buffer
427 223 : size_t size = strlen(to_move) + 1;
428 223 : assert(size <= buflen);
429 : // strerror_r may have return a pointer to/into the buffer
430 223 : memmove(buf, to_move, size);
431 223 : return size - 1;
432 : } else {
433 0 : return strlen(buf);
434 : }
435 : }
436 :
437 :
438 :
439 154 : void mnstr_copy_error(stream *dst, stream *src)
440 : {
441 154 : dst->errkind = src->errkind;
442 154 : memcpy(dst->errmsg, src->errmsg, sizeof(dst->errmsg));
443 154 : }
444 :
445 : char *
446 0 : mnstr_error(const stream *s)
447 : {
448 0 : const char *msg = mnstr_peek_error(s);
449 0 : if (msg != NULL)
450 0 : return strdup(msg);
451 : else
452 : return NULL;
453 : }
454 :
455 : const char*
456 3 : mnstr_peek_error(const stream *s)
457 : {
458 3 : if (s == NULL) {
459 1 : struct tl_error_buf *b = get_tl_error_buf();
460 1 : if (b != NULL)
461 1 : return b->msg;
462 : else
463 : return "unknown error";
464 : }
465 :
466 2 : if (s->errkind == MNSTR_NO__ERROR)
467 : return "no error";
468 :
469 2 : if (s->errmsg[0] != '\0')
470 2 : return s->errmsg;
471 :
472 0 : return mnstr_error_kind_description(s->errkind);
473 : }
474 :
475 : static const char *
476 17 : mnstr_error_kind_description(mnstr_error_kind kind)
477 : {
478 17 : switch (kind) {
479 : case MNSTR_NO__ERROR:
480 : /* unreachable */
481 0 : assert(0);
482 : return NULL;
483 : case MNSTR_OPEN_ERROR:
484 : return "error could not open";
485 0 : case MNSTR_READ_ERROR:
486 0 : return "error reading";
487 0 : case MNSTR_WRITE_ERROR:
488 0 : return "error writing";
489 17 : case MNSTR_TIMEOUT:
490 17 : return "timeout";
491 0 : case MNSTR_UNEXPECTED_EOF:
492 0 : return "timeout";
493 : }
494 :
495 0 : return "Unknown error";
496 : }
497 :
498 : /* flush buffer, return 0 on success, non-zero on failure */
499 : int
500 611799 : mnstr_flush(stream *s, mnstr_flush_level flush_level)
501 : {
502 611799 : if (s == NULL)
503 : return -1;
504 : #ifdef STREAM_DEBUG
505 : fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
506 : #endif
507 611799 : assert(!s->readonly);
508 611799 : if (s->errkind != MNSTR_NO__ERROR)
509 : return -1;
510 611799 : if (s->flush)
511 611799 : return s->flush(s, flush_level);
512 : return 0;
513 : }
514 :
515 :
516 : /* sync file to disk, return 0 on success, non-zero on failure */
517 : int
518 51 : mnstr_fsync(stream *s)
519 : {
520 51 : if (s == NULL)
521 : return -1;
522 : #ifdef STREAM_DEBUG
523 : fprintf(stderr, "fsync %s (%d)\n",
524 : s->name ? s->name : "<unnamed>", s->errnr);
525 : #endif
526 51 : assert(!s->readonly);
527 51 : if (s->errkind != MNSTR_NO__ERROR)
528 : return -1;
529 51 : if (s->fsync)
530 51 : return s->fsync(s);
531 : return 0;
532 : }
533 :
534 :
535 : int
536 0 : mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
537 : {
538 0 : if (s == NULL || p == NULL)
539 : return -1;
540 : #ifdef STREAM_DEBUG
541 : fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
542 : #endif
543 0 : if (s->errkind != MNSTR_NO__ERROR)
544 : return -1;
545 0 : if (s->fgetpos)
546 0 : return s->fgetpos(s, p);
547 : return 0;
548 : }
549 :
550 :
551 : int
552 0 : mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
553 : {
554 0 : if (s == NULL)
555 : return -1;
556 : #ifdef STREAM_DEBUG
557 : fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
558 : #endif
559 0 : if (s->errkind != MNSTR_NO__ERROR)
560 : return -1;
561 0 : if (s->fsetpos)
562 0 : return s->fsetpos(s, p);
563 : return 0;
564 : }
565 :
566 :
567 : int
568 11142008 : mnstr_isalive(const stream *s)
569 : {
570 11142008 : if (s == NULL)
571 : return 0;
572 11142008 : if (s->errkind != MNSTR_NO__ERROR)
573 : return -1;
574 11142008 : if (s->isalive)
575 10300760 : return s->isalive(s);
576 : return 1;
577 : }
578 :
579 :
580 : bool
581 178573 : mnstr_eof(const stream *s)
582 : {
583 178573 : return s->eof;
584 : }
585 :
586 : char *
587 24420 : mnstr_name(const stream *s)
588 : {
589 24420 : if (s == NULL)
590 : return "connection terminated";
591 24420 : return s->name;
592 : }
593 :
594 :
595 : mnstr_error_kind
596 2160555 : mnstr_errnr(const stream *s)
597 : {
598 2160555 : if (s == NULL)
599 : return MNSTR_READ_ERROR;
600 2160555 : return s->errkind;
601 : }
602 :
603 : const char *
604 0 : mnstr_error_kind_name(mnstr_error_kind k)
605 : {
606 0 : switch (k) {
607 : case MNSTR_NO__ERROR:
608 : return "MNSTR_NO__ERROR";
609 0 : case MNSTR_OPEN_ERROR:
610 0 : return "MNSTR_OPEN_ERROR";
611 0 : case MNSTR_READ_ERROR:
612 0 : return "MNSTR_READ_ERROR";
613 0 : case MNSTR_WRITE_ERROR:
614 0 : return "MNSTR_WRITE_ERROR";
615 0 : case MNSTR_TIMEOUT:
616 0 : return "MNSTR_TIMEOUT";
617 0 : default:
618 0 : return "<UNKNOWN_ERROR>";
619 : }
620 :
621 : }
622 : void
623 2 : mnstr_clearerr(stream *s)
624 : {
625 2 : if (s != NULL) {
626 2 : s->errkind = MNSTR_NO__ERROR;
627 2 : s->errmsg[0] = '\0';
628 2 : if (s->clrerr)
629 0 : s->clrerr(s);
630 : }
631 2 : }
632 :
633 :
634 : bool
635 0 : mnstr_isbinary(const stream *s)
636 : {
637 0 : if (s == NULL)
638 : return false;
639 0 : return s->binary;
640 : }
641 :
642 :
643 : bool
644 0 : mnstr_get_swapbytes(const stream *s)
645 : {
646 0 : if (s == NULL)
647 : return 0;
648 0 : return s->swapbytes;
649 : }
650 :
651 :
652 : /* set stream to big-endian/little-endian byte order; the default is
653 : * native byte order */
654 : void
655 6620 : mnstr_set_bigendian(stream *s, bool bigendian)
656 : {
657 6620 : if (s == NULL)
658 : return;
659 : #ifdef STREAM_DEBUG
660 : fprintf(stderr, "mnstr_set_bigendian %s %s\n",
661 : s->name ? s->name : "<unnamed>",
662 : swapbytes ? "true" : "false");
663 : #endif
664 6620 : assert(s->readonly);
665 6620 : s->binary = true;
666 : #ifdef WORDS_BIGENDIAN
667 : s->swapbytes = !bigendian;
668 : #else
669 6620 : s->swapbytes = bigendian;
670 : #endif
671 : }
672 :
673 :
674 : void
675 33567 : close_stream(stream *s)
676 : {
677 33567 : if (s) {
678 33219 : if (s->close)
679 33219 : s->close(s);
680 33219 : if (s->destroy)
681 33219 : s->destroy(s);
682 : }
683 33566 : }
684 :
685 :
686 : void
687 217195 : destroy_stream(stream *s)
688 : {
689 217195 : if (s->name)
690 217195 : free(s->name);
691 217195 : free(s);
692 217195 : }
693 :
694 :
695 : stream *
696 217243 : create_stream(const char *name)
697 : {
698 : stream *s;
699 :
700 217243 : if (name == NULL) {
701 0 : mnstr_set_open_error(NULL, 0, "internal error: name not set");
702 0 : return NULL;
703 : }
704 217243 : if ((s = (stream *) malloc(sizeof(*s))) == NULL) {
705 0 : mnstr_set_open_error(name, errno, "malloc failed");
706 0 : return NULL;
707 : }
708 217243 : *s = (stream) {
709 : .swapbytes = false,
710 : .readonly = true,
711 : .isutf8 = false, /* not known for sure */
712 : .binary = false,
713 : .eof = false,
714 217243 : .name = strdup(name),
715 : .errkind = MNSTR_NO__ERROR,
716 : .errmsg = {0},
717 : .destroy = destroy_stream,
718 : };
719 217243 : if(s->name == NULL) {
720 0 : free(s);
721 0 : mnstr_set_open_error(name, errno, "malloc failed");
722 0 : return NULL;
723 : }
724 : #ifdef STREAM_DEBUG
725 : fprintf(stderr, "create_stream %s -> %p\n",
726 : name ? name : "<unnamed>", s);
727 : #endif
728 217243 : mnstr_set_open_error(NULL, 0, NULL); // clear the error
729 217243 : return s;
730 : }
731 :
732 :
733 : static ssize_t
734 0 : wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
735 : {
736 0 : ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt);
737 0 : s->eof |= s->inner->eof;
738 0 : return ret;
739 : }
740 :
741 :
742 : static ssize_t
743 0 : wrapper_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
744 : {
745 0 : return s->inner->write(s->inner, buf, elmsize, cnt);
746 : }
747 :
748 :
749 : static void
750 0 : wrapper_close(stream *s)
751 : {
752 0 : s->inner->close(s->inner);
753 0 : }
754 :
755 :
756 : static void
757 0 : wrapper_clrerr(stream *s)
758 : {
759 0 : s->inner->clrerr(s->inner);
760 0 : }
761 :
762 :
763 : static void
764 0 : wrapper_destroy(stream *s)
765 : {
766 0 : s->inner->destroy(s->inner);
767 0 : destroy_stream(s);
768 0 : }
769 :
770 :
771 : static int
772 0 : wrapper_flush(stream *s, mnstr_flush_level flush_level)
773 : {
774 0 : return s->inner->flush(s->inner, flush_level);
775 : }
776 :
777 :
778 : static int
779 4 : wrapper_fsync(stream *s)
780 : {
781 4 : return s->inner->fsync(s->inner);
782 : }
783 :
784 :
785 : static int
786 0 : wrapper_fgetpos(stream *restrict s, fpos_t *restrict p)
787 : {
788 0 : return s->inner->fgetpos(s->inner, p);
789 : }
790 :
791 :
792 : static int
793 0 : wrapper_fsetpos(stream *restrict s, fpos_t *restrict p)
794 : {
795 0 : return s->inner->fsetpos(s->inner, p);
796 : }
797 :
798 :
799 : static void
800 4342 : wrapper_update_timeout(stream *s)
801 : {
802 4342 : s->inner->timeout = s->timeout;
803 4342 : s->inner->timeout_func = s->timeout_func;
804 4342 : s->inner->timeout_data = s->timeout_data;
805 4342 : s->inner->update_timeout(s->inner);
806 4342 : }
807 :
808 :
809 : static int
810 10300860 : wrapper_isalive(const stream *s)
811 : {
812 10300860 : return s->inner->isalive(s->inner);
813 : }
814 :
815 :
816 : stream *
817 13956 : create_wrapper_stream(const char *name, stream *inner)
818 : {
819 13956 : if (inner == NULL)
820 : return NULL;
821 13956 : if (name == NULL)
822 13952 : name = inner->name;
823 13956 : stream *s = create_stream(name);
824 13956 : if (s == NULL)
825 : return NULL;
826 :
827 :
828 13956 : s->swapbytes = inner->swapbytes;
829 13956 : s->readonly = inner->readonly;
830 13956 : s->isutf8 = inner->isutf8;
831 13956 : s->binary = inner->binary;
832 13956 : s->timeout = inner->timeout;
833 13956 : s->inner = inner;
834 :
835 13956 : s->read = inner->read == NULL ? NULL : wrapper_read;
836 13956 : s->write = inner->write == NULL ? NULL : wrapper_write;
837 13956 : s->close = inner->close == NULL ? NULL : wrapper_close;
838 13956 : s->clrerr = inner->clrerr == NULL ? NULL : wrapper_clrerr;
839 13956 : s->destroy = wrapper_destroy;
840 13956 : s->flush = inner->flush == NULL ? NULL : wrapper_flush;
841 13956 : s->fsync = inner->fsync == NULL ? NULL : wrapper_fsync;
842 13956 : s->fgetpos = inner->fgetpos == NULL ? NULL : wrapper_fgetpos;
843 13956 : s->fsetpos = inner->fsetpos == NULL ? NULL : wrapper_fsetpos;
844 13956 : s->isalive = inner->isalive == NULL ? NULL : wrapper_isalive;
845 13956 : s->update_timeout = inner->update_timeout == NULL ? NULL : wrapper_update_timeout;
846 :
847 13956 : return s;
848 : }
849 :
850 : /* ------------------------------------------------------------------ */
851 : /* streams working on a disk file, compressed or not */
852 :
853 : stream *
854 12605 : open_rstream(const char *filename)
855 : {
856 12605 : if (filename == NULL)
857 : return NULL;
858 : #ifdef STREAM_DEBUG
859 : fprintf(stderr, "open_rstream %s\n", filename);
860 : #endif
861 :
862 12605 : stream *s = open_stream(filename, "rb");
863 12605 : if (s == NULL)
864 : return NULL;
865 :
866 12519 : stream *c = compressed_stream(s, 0);
867 12519 : if (c == NULL)
868 0 : close_stream(s);
869 :
870 : return c;
871 : }
872 :
873 : stream *
874 11901 : open_wstream(const char *filename)
875 : {
876 11901 : if (filename == NULL)
877 : return NULL;
878 : #ifdef STREAM_DEBUG
879 : fprintf(stderr, "open_wstream %s\n", filename);
880 : #endif
881 :
882 11901 : stream *s = open_stream(filename, "wb");
883 11901 : if (s == NULL)
884 : return NULL;
885 :
886 11901 : stream *c = compressed_stream(s, 0);
887 11901 : if (c == NULL) {
888 0 : close_stream(s);
889 0 : file_remove(filename);
890 : }
891 :
892 : return c;
893 : }
894 :
895 : stream *
896 281 : open_rastream(const char *filename)
897 : {
898 281 : if (filename == NULL)
899 : return NULL;
900 : #ifdef STREAM_DEBUG
901 : fprintf(stderr, "open_rastream %s\n", filename);
902 : #endif
903 281 : stream *s = open_rstream(filename);
904 281 : if (s == NULL)
905 : return NULL;
906 :
907 280 : stream *t = create_text_stream(s);
908 280 : if (t == NULL)
909 0 : close_stream(s);
910 :
911 : return t;
912 : }
913 :
914 : stream *
915 6 : open_wastream(const char *filename)
916 : {
917 6 : if (filename == NULL)
918 : return NULL;
919 : #ifdef STREAM_DEBUG
920 : fprintf(stderr, "open_wastream %s\n", filename);
921 : #endif
922 6 : stream *s = open_wstream(filename);
923 6 : if (s == NULL)
924 : return NULL;
925 :
926 6 : stream *t = create_text_stream(s);
927 6 : if (t == NULL) {
928 0 : close_stream(s);
929 0 : file_remove(filename);
930 : }
931 :
932 : return t;
933 : }
934 :
935 :
936 : /* put here because it depends on both bs_read AND bs2_read */
937 : bool
938 174250 : isa_block_stream(const stream *s)
939 : {
940 174250 : assert(s != NULL);
941 348500 : return s &&
942 174250 : ((s->read == bs_read ||
943 1754 : s->write == bs_write) ||
944 1754 : (s->read == bs2_read ||
945 : s->write == bs2_write));
946 : }
947 :
948 :
949 : /* Put here because I need to think very carefully about this
950 : * mnstr_read(,, 0, 0). What would that mean?
951 : */
952 : ssize_t
953 6757 : mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
954 : {
955 : ssize_t len = 0;
956 6757 : char x = 0;
957 :
958 6757 : if (s == NULL || buf == NULL)
959 : return -1;
960 6757 : assert(s->read == bs_read || s->write == bs_write);
961 13514 : if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
962 6757 : mnstr_read(s, &x, 0, 0) < 0 /* read prompt */ ||
963 6757 : x > 0)
964 0 : return -1;
965 : return len;
966 : }
|