Line data Source code
1 : /* 2 : * This Source Code Form is subject to the terms of the Mozilla Public 3 : * License, v. 2.0. If a copy of the MPL was not distributed with this 4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. 5 : * 6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V. 7 : */ 8 : 9 : #include "monetdb_config.h" 10 : #include "stream.h" 11 : #include "stream_internal.h" 12 : 13 : 14 : 15 : /* fixed-width format streams */ 16 : #define STREAM_FWF_NAME "fwf_ftw" 17 : 18 : typedef struct { 19 : stream *s; 20 : bool eof; 21 : /* config */ 22 : size_t num_fields; 23 : size_t *widths; 24 : char filler; 25 : /* state */ 26 : size_t line_len; 27 : char *in_buf; 28 : char *out_buf; 29 : size_t out_buf_start; 30 : size_t out_buf_remaining; 31 : } stream_fwf_data; 32 : 33 : 34 : static ssize_t 35 2 : stream_fwf_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt) 36 : { 37 : stream_fwf_data *fsd; 38 : size_t to_write = cnt; 39 : size_t buf_written = 0; 40 : char nl_buf; 41 : 42 2 : fsd = (stream_fwf_data *) s->stream_data.p; 43 2 : if (fsd == NULL || elmsize != 1) { 44 : return -1; 45 : } 46 2 : if (fsd->eof) 47 : return 0; 48 : 49 33 : while (to_write > 0) { 50 : /* input conversion */ 51 33 : if (fsd->out_buf_remaining == 0) { /* need to convert next line */ 52 : size_t field_idx, in_buf_pos = 0, out_buf_pos = 0; 53 33 : ssize_t actually_read = fsd->s->read(fsd->s, fsd->in_buf, 1, fsd->line_len); 54 33 : if (actually_read < (ssize_t) fsd->line_len) { /* incomplete last line */ 55 1 : if (actually_read < 0) { 56 : return actually_read; /* this is an error */ 57 : } 58 1 : fsd->eof |= fsd->s->eof; 59 1 : return (ssize_t) buf_written; /* skip last line */ 60 : } 61 : /* consume to next newline */ 62 32 : while (fsd->s->read(fsd->s, &nl_buf, 1, 1) == 1 && 63 32 : nl_buf != '\n') 64 : ; 65 32 : fsd->eof |= fsd->s->eof; 66 : 67 384 : for (field_idx = 0; field_idx < fsd->num_fields; field_idx++) { 68 : char *val_start, *val_end; 69 352 : val_start = fsd->in_buf + in_buf_pos; 70 352 : in_buf_pos += fsd->widths[field_idx]; 71 352 : val_end = fsd->in_buf + in_buf_pos - 1; 72 686 : while (*val_start == fsd->filler) 73 334 : val_start++; 74 352 : while (*val_end == fsd->filler) 75 0 : val_end--; 76 1330 : while (val_start <= val_end) { 77 978 : if (*val_start == STREAM_FWF_FIELD_SEP) { 78 0 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_ESCAPE; 79 : } 80 978 : fsd->out_buf[out_buf_pos++] = *val_start++; 81 : } 82 352 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_FIELD_SEP; 83 : } 84 32 : fsd->out_buf[out_buf_pos++] = STREAM_FWF_RECORD_SEP; 85 32 : fsd->out_buf_remaining = out_buf_pos; 86 32 : fsd->out_buf_start = 0; 87 : } 88 : /* now we know something is in output_buf so deliver it */ 89 32 : if (fsd->out_buf_remaining <= to_write) { 90 32 : memcpy((char *) buf + buf_written, fsd->out_buf + fsd->out_buf_start, fsd->out_buf_remaining); 91 32 : to_write -= fsd->out_buf_remaining; 92 32 : buf_written += fsd->out_buf_remaining; 93 32 : fsd->out_buf_remaining = 0; 94 : } else { 95 0 : memcpy((char *) buf + buf_written, fsd->out_buf + fsd->out_buf_start, to_write); 96 0 : fsd->out_buf_start += to_write; 97 0 : fsd->out_buf_remaining -= to_write; 98 0 : buf_written += to_write; 99 : to_write = 0; 100 : } 101 : } 102 0 : return (ssize_t) buf_written; 103 : } 104 : 105 : 106 : static void 107 2 : stream_fwf_close(stream *s) 108 : { 109 2 : stream_fwf_data *fsd = (stream_fwf_data *) s->stream_data.p; 110 : 111 2 : if (fsd != NULL) { 112 : stream_fwf_data *fsd = (stream_fwf_data *) s->stream_data.p; 113 1 : close_stream(fsd->s); 114 1 : free(fsd->widths); 115 1 : free(fsd->in_buf); 116 1 : free(fsd->out_buf); 117 1 : free(fsd); 118 1 : s->stream_data.p = NULL; 119 : } 120 2 : } 121 : 122 : static void 123 1 : stream_fwf_destroy(stream *s) 124 : { 125 1 : stream_fwf_close(s); 126 1 : destroy_stream(s); 127 1 : } 128 : 129 : stream * 130 1 : stream_fwf_create(stream *restrict s, size_t num_fields, size_t *restrict widths, char filler) 131 : { 132 : stream *ns; 133 1 : stream_fwf_data *fsd = malloc(sizeof(stream_fwf_data)); 134 : 135 1 : if (fsd == NULL) { 136 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 137 0 : return NULL; 138 : } 139 1 : *fsd = (stream_fwf_data) { 140 : .s = s, 141 : .num_fields = num_fields, 142 : .widths = widths, 143 : .filler = filler, 144 : .line_len = 0, 145 : .eof = false, 146 : }; 147 12 : for (size_t i = 0; i < num_fields; i++) { 148 11 : fsd->line_len += widths[i]; 149 : } 150 1 : fsd->in_buf = malloc(fsd->line_len); 151 1 : if (fsd->in_buf == NULL) { 152 0 : free(fsd); 153 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 154 0 : return NULL; 155 : } 156 1 : fsd->out_buf = malloc(fsd->line_len * 3); 157 1 : if (fsd->out_buf == NULL) { 158 0 : free(fsd->in_buf); 159 0 : free(fsd); 160 0 : mnstr_set_open_error(STREAM_FWF_NAME, errno, NULL); 161 0 : return NULL; 162 : } 163 1 : if ((ns = create_stream(STREAM_FWF_NAME)) == NULL) { 164 0 : free(fsd->in_buf); 165 0 : free(fsd->out_buf); 166 0 : free(fsd); 167 0 : return NULL; 168 : } 169 1 : ns->read = stream_fwf_read; 170 1 : ns->close = stream_fwf_close; 171 1 : ns->destroy = stream_fwf_destroy; 172 1 : ns->write = NULL; 173 1 : ns->flush = NULL; 174 1 : ns->readonly = true; 175 1 : ns->stream_data.p = fsd; 176 1 : return ns; 177 : }