Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 : /* streams working on a lzma/xz-compressed disk file */
10 :
11 : #include "monetdb_config.h"
12 : #include "stream.h"
13 : #include "stream_internal.h"
14 : #include "pump.h"
15 :
16 : #include <assert.h>
17 :
18 : static pump_result pump_in(stream *s);
19 : static pump_result pump_out(stream *s, pump_action action);
20 :
21 : static ssize_t pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt);
22 : static ssize_t pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt);
23 : static int pump_flush(stream *s, mnstr_flush_level flush_level);
24 : static void pump_close(stream *s);
25 : static void pump_destroy(stream *s);
26 :
27 :
28 : stream *
29 438 : pump_stream(stream *inner, pump_state *state)
30 : {
31 438 : assert(inner);
32 438 : assert(state);
33 438 : assert(state->set_src_win != NULL);
34 438 : assert(state->get_dst_win != NULL);
35 438 : assert(state->set_dst_win != NULL);
36 438 : assert(state->get_buffer != NULL);
37 438 : assert(state->worker != NULL);
38 438 : assert(state->get_error != NULL);
39 438 : assert(state->finalizer != NULL);
40 :
41 438 : inner_state_t *inner_state = state->inner_state;
42 :
43 438 : stream *s = create_wrapper_stream(NULL, inner);
44 438 : if (s == NULL)
45 : return NULL;
46 :
47 438 : pump_buffer buf = state->get_buffer(inner_state);
48 438 : if (s->readonly) {
49 : // Read from inner stream to src buffer through pumper to outbufs.
50 : // This means the src window starts empty
51 427 : buf.count = 0;
52 427 : state->set_src_win(inner_state, buf);
53 : } else {
54 : // from inbufs through pumper to dst buffer to inner stream.
55 : // This means the out window is our whole buffer.
56 : // Check for NULL in case caller has already initialized it
57 : // and written something
58 11 : if (state->get_dst_win(inner_state).start == NULL)
59 2 : state->set_dst_win(inner_state, buf);
60 : }
61 :
62 438 : s->stream_data.p = (void*) state;
63 438 : s->read = pump_read;
64 438 : s->write = pump_write;
65 438 : s->flush = pump_flush;
66 438 : s->close = pump_close;
67 438 : s->destroy = pump_destroy;
68 438 : return s;
69 : }
70 :
71 :
72 : static ssize_t
73 150801 : pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
74 : {
75 150801 : pump_state *state = (pump_state*) s->stream_data.p;
76 150801 : inner_state_t *inner_state = state->inner_state;
77 150801 : size_t size = elmsize * cnt;
78 :
79 150801 : state->set_dst_win(inner_state, (pump_buffer){ .start = buf, .count = size});
80 150801 : pump_result ret = pump_in(s);
81 150801 : if (ret == PUMP_ERROR) {
82 0 : const char *msg = state->get_error(inner_state);
83 0 : if (msg != NULL)
84 : msg = "processing failed without further error indication";
85 0 : mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
86 0 : return -1;
87 : }
88 :
89 150801 : char *free_space = state->get_dst_win(inner_state).start;
90 150801 : ssize_t nread = free_space - (char*) buf;
91 :
92 150801 : return nread / (ssize_t) elmsize;
93 : }
94 :
95 :
96 : static ssize_t
97 2225 : pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
98 : {
99 2225 : pump_state *state = (pump_state*) s->stream_data.p;
100 2225 : inner_state_t *inner_state = state->inner_state;
101 2225 : size_t size = elmsize * cnt;
102 :
103 2225 : if (size == 0)
104 0 : return cnt;
105 :
106 2225 : state->set_src_win(inner_state, (pump_buffer){ .start = (void*)buf, .count = size });
107 2225 : pump_result ret = pump_out(s, PUMP_NO_FLUSH);
108 2225 : if (ret == PUMP_ERROR) {
109 0 : const char *msg = state->get_error(inner_state);
110 0 : if (msg != NULL)
111 : msg = "processing failed without further error indication";
112 0 : mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
113 0 : return -1;
114 : }
115 2225 : ssize_t nwritten = state->get_src_win(inner_state).start - (char*)buf;
116 2225 : return nwritten / (ssize_t) elmsize;
117 : }
118 :
119 :
120 777 : static int pump_flush(stream *s, mnstr_flush_level flush_level)
121 : {
122 777 : pump_state *state = (pump_state*) s->stream_data.p;
123 777 : inner_state_t *inner_state = state->inner_state;
124 : pump_action action;
125 :
126 777 : switch (flush_level) {
127 : case MNSTR_FLUSH_DATA:
128 : action = PUMP_FLUSH_DATA;
129 : break;
130 776 : case MNSTR_FLUSH_ALL:
131 : action = PUMP_FLUSH_ALL;
132 776 : break;
133 : default:
134 0 : assert(0 /* unknown flush_level */);
135 : action = PUMP_FLUSH_DATA;
136 : break;
137 : }
138 :
139 777 : state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
140 777 : ssize_t nwritten = pump_out(s, action);
141 : if (nwritten < 0)
142 : return -1;
143 : else
144 777 : return mnstr_flush(s->inner, flush_level);
145 : }
146 :
147 :
148 : static void
149 438 : pump_close(stream *s)
150 : {
151 438 : pump_state *state = (pump_state*) s->stream_data.p;
152 438 : inner_state_t *inner_state = state->inner_state;
153 :
154 438 : if (!s->readonly) {
155 11 : state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
156 11 : pump_out(s, PUMP_FINISH);
157 : }
158 438 : mnstr_close(s->inner);
159 438 : }
160 :
161 :
162 : static void
163 438 : pump_destroy(stream *s)
164 : {
165 438 : pump_state *state = (pump_state*) s->stream_data.p;
166 438 : inner_state_t *inner_state = state->inner_state;
167 :
168 438 : state->finalizer(inner_state);
169 438 : free(state);
170 438 : mnstr_destroy(s->inner);
171 438 : destroy_stream(s);
172 438 : }
173 :
174 : static pump_result
175 150801 : pump_in(stream *s)
176 : {
177 150801 : pump_state *state = (pump_state *) s->stream_data.p;
178 150801 : inner_state_t *inner_state = state->inner_state;
179 :
180 150801 : char *before = state->get_dst_win(inner_state).start;
181 : (void) before; // nice while in the debugger
182 :
183 150801 : pump_buffer buffer = state->get_buffer(inner_state);
184 185172 : while (1) {
185 335973 : pump_buffer dst = state->get_dst_win(inner_state);
186 335973 : pump_buffer src = state->get_src_win(inner_state);
187 :
188 335973 : if (dst.count == 0)
189 : // Output buffer is full, we're done.
190 150801 : return PUMP_OK;
191 :
192 : // Handle input, if possible and necessary
193 185835 : if (src.start != NULL && src.count == 0) {
194 : // start != NULL means we haven't encountered EOF yet
195 :
196 35468 : ssize_t nread = mnstr_read(s->inner, buffer.start, 1, buffer.count);
197 :
198 35468 : if (nread < 0)
199 : // Error. Return directly, discarding any data lingering
200 : // in the internal state.
201 : return PUMP_ERROR;
202 35468 : if (nread == 0) {
203 : // Set to NULL so we'll remember next time.
204 : // Maybe there is some data in the internal state we don't
205 : // return immediately.
206 : src = (pump_buffer){.start=NULL, .count=0};
207 415 : s->eof |= s->inner->eof;
208 : } else
209 : // All good
210 35053 : src = (pump_buffer) { .start = buffer.start, .count = nread};
211 :
212 35468 : state->set_src_win(inner_state, src);
213 : }
214 :
215 185835 : pump_action action = (src.start != NULL) ? PUMP_NO_FLUSH : PUMP_FINISH;
216 :
217 : // Try to make some progress
218 : assert(dst.count > 0);
219 185835 : assert(src.count > 0 || action == PUMP_FINISH);
220 185835 : pump_result ret = state->worker(inner_state, action);
221 185835 : if (ret == PUMP_ERROR)
222 : return PUMP_ERROR;
223 :
224 185835 : if (ret == PUMP_END)
225 : // If you say so
226 : return PUMP_END;
227 :
228 : // If we get here we made some progress so we're ready for a new iteration.
229 : }
230 : }
231 :
232 :
233 : static pump_result
234 3013 : pump_out(stream *s, pump_action action)
235 : {
236 3013 : pump_state *state = (pump_state *) s->stream_data.p;
237 3013 : inner_state_t *inner_state = state->inner_state;
238 :
239 3013 : void *before = state->get_src_win(inner_state).start;
240 : (void) before; // nice while in the debugger
241 :
242 3013 : pump_buffer buffer = state->get_buffer(inner_state);
243 :
244 3617 : while (1) {
245 6630 : pump_buffer dst = state->get_dst_win(inner_state);
246 6630 : pump_buffer src = state->get_src_win(inner_state);
247 :
248 : // Make sure there is room in the output buffer
249 6630 : assert(state->elbow_room <= buffer.count);
250 6630 : if (dst.count == 0 || dst.count < state->elbow_room) {
251 49 : size_t amount = dst.start - buffer.start;
252 49 : ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
253 49 : if (nwritten != (ssize_t)amount)
254 3013 : return PUMP_ERROR;
255 : dst = buffer;
256 49 : state->set_dst_win(inner_state, dst); // reset output window
257 : }
258 :
259 : // Try to make progress
260 6630 : pump_result ret = state->worker(inner_state, action);
261 6630 : if (ret == PUMP_ERROR)
262 : return PUMP_ERROR;
263 :
264 : // src and dst have been invalidated by the call to worker
265 6436 : dst = state->get_dst_win(inner_state);
266 6436 : src = state->get_src_win(inner_state);
267 :
268 : // There was no error but if input is still available, we definitely
269 : // need another round
270 6436 : if (src.count > 0)
271 3419 : continue;
272 :
273 : // Though the input data has been consumed, some of it might still
274 : // linger in the internal state.
275 3017 : if (action == PUMP_NO_FLUSH) {
276 : // Let it linger, we'll combine it with the next batch
277 2225 : assert(ret == PUMP_OK); // worker would never PUMP_END, would it?
278 : return PUMP_OK;
279 : }
280 :
281 : // We are flushing or finishing or whatever.
282 : // We may need to do more iterations to fully flush the internal state.
283 : // Is there any internal state left?
284 792 : if (ret == PUMP_OK)
285 : // yes, there is
286 198 : continue;
287 :
288 : // All internal state has been drained.
289 : // Now drain the output buffer
290 594 : assert(ret == PUMP_END);
291 594 : size_t amount = dst.start - buffer.start;
292 594 : if (amount > 0) {
293 593 : ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
294 593 : if (nwritten != (ssize_t)amount)
295 : return PUMP_ERROR;
296 : }
297 594 : state->set_dst_win(inner_state, buffer); // reset output window
298 594 : return PUMP_END;
299 : }
300 :
301 :
302 : }
|