LCOV - code coverage report
Current view: top level - common/stream - pump.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 125 135 92.6 %
Date: 2021-10-13 02:24:04 Functions: 8 8 100.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /* streams working on a lzma/xz-compressed disk file */
      10             : 
      11             : #include "monetdb_config.h"
      12             : #include "stream.h"
      13             : #include "stream_internal.h"
      14             : #include "pump.h"
      15             : 
      16             : #include <assert.h>
      17             : 
      18             : static pump_result pump_in(stream *s);
      19             : static pump_result pump_out(stream *s, pump_action action);
      20             : 
      21             : static ssize_t pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt);
      22             : static ssize_t pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt);
      23             : static int pump_flush(stream *s, mnstr_flush_level flush_level);
      24             : static void pump_close(stream *s);
      25             : static void pump_destroy(stream *s);
      26             : 
      27             : 
      28             : stream *
      29         436 : pump_stream(stream *inner, pump_state *state)
      30             : {
      31         436 :         assert(inner);
      32         436 :         assert(state);
      33         436 :         assert(state->set_src_win != NULL);
      34         436 :         assert(state->get_dst_win != NULL);
      35         436 :         assert(state->set_dst_win != NULL);
      36         436 :         assert(state->get_buffer != NULL);
      37         436 :         assert(state->worker != NULL);
      38         436 :         assert(state->get_error != NULL);
      39         436 :         assert(state->finalizer != NULL);
      40             : 
      41         436 :         inner_state_t *inner_state = state->inner_state;
      42             : 
      43         436 :         stream *s = create_wrapper_stream(NULL, inner);
      44         436 :         if (s == NULL)
      45             :                 return NULL;
      46             : 
      47         436 :         pump_buffer buf = state->get_buffer(inner_state);
      48         436 :         if (s->readonly) {
      49             :                 // Read from inner stream to src buffer through pumper to outbufs.
      50             :                 // This means the src window starts empty
      51         426 :                 buf.count = 0;
      52         426 :                 state->set_src_win(inner_state, buf);
      53             :         } else {
      54             :                 // from inbufs through pumper to dst buffer to inner stream.
      55             :                 // This means the out window is our whole buffer.
      56             :                 // Check for NULL in case caller has already initialized it
      57             :                 // and written something
      58          10 :                 if (state->get_dst_win(inner_state).start == NULL)
      59           2 :                         state->set_dst_win(inner_state, buf);
      60             :         }
      61             : 
      62         436 :         s->stream_data.p = (void*) state;
      63         436 :         s->read = pump_read;
      64         436 :         s->write = pump_write;
      65         436 :         s->flush = pump_flush;
      66         436 :         s->close = pump_close;
      67         436 :         s->destroy = pump_destroy;
      68         436 :         return s;
      69             : }
      70             : 
      71             : 
      72             : static ssize_t
      73      148659 : pump_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
      74             : {
      75      148659 :         pump_state *state = (pump_state*) s->stream_data.p;
      76      148659 :         inner_state_t *inner_state = state->inner_state;
      77      148659 :         size_t size = elmsize * cnt;
      78             : 
      79      148659 :         state->set_dst_win(inner_state, (pump_buffer){ .start = buf, .count = size});
      80      148659 :         pump_result ret = pump_in(s);
      81      148659 :         if (ret == PUMP_ERROR) {
      82           0 :                 const char *msg = state->get_error(inner_state);
      83           0 :                 if (msg != NULL)
      84             :                         msg = "processing failed without further error indication";
      85           0 :                 mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
      86           0 :                 return -1;
      87             :         }
      88             : 
      89      148659 :         char *free_space = state->get_dst_win(inner_state).start;
      90      148659 :         ssize_t nread = free_space - (char*) buf;
      91             : 
      92      148659 :         return nread / (ssize_t) elmsize;
      93             : }
      94             : 
      95             : 
      96             : static ssize_t
      97        2089 : pump_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
      98             : {
      99        2089 :         pump_state *state = (pump_state*) s->stream_data.p;
     100        2089 :         inner_state_t *inner_state = state->inner_state;
     101        2089 :         size_t size = elmsize * cnt;
     102             : 
     103        2089 :         if (size == 0)
     104           0 :                 return cnt;
     105             : 
     106        2089 :         state->set_src_win(inner_state, (pump_buffer){ .start = (void*)buf, .count = size });
     107        2089 :         pump_result ret = pump_out(s, PUMP_NO_FLUSH);
     108        2089 :         if (ret == PUMP_ERROR) {
     109           0 :                 const char *msg = state->get_error(inner_state);
     110           0 :                 if (msg != NULL)
     111             :                         msg = "processing failed without further error indication";
     112           0 :                 mnstr_set_error(s, MNSTR_READ_ERROR, "%s", msg);
     113           0 :                 return -1;
     114             :         }
     115        2089 :         ssize_t nwritten = state->get_src_win(inner_state).start - (char*)buf;
     116        2089 :         return nwritten / (ssize_t) elmsize;
     117             : }
     118             : 
     119             : 
     120         761 : static int pump_flush(stream *s, mnstr_flush_level flush_level)
     121             : {
     122         761 :         pump_state *state = (pump_state*) s->stream_data.p;
     123         761 :         inner_state_t *inner_state = state->inner_state;
     124             :         pump_action action;
     125             : 
     126         761 :         switch (flush_level) {
     127             :                 case MNSTR_FLUSH_DATA:
     128             :                         action = PUMP_FLUSH_DATA;
     129             :                         break;
     130         760 :                 case MNSTR_FLUSH_ALL:
     131             :                         action = PUMP_FLUSH_ALL;
     132         760 :                         break;
     133             :                 default:
     134           0 :                         assert(0 /* unknown flush_level */);
     135             :                         action = PUMP_FLUSH_DATA;
     136             :                         break;
     137             :         }
     138             : 
     139         761 :         state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
     140         761 :         ssize_t nwritten = pump_out(s, action);
     141             :         if (nwritten < 0)
     142             :                 return -1;
     143             :         else
     144         761 :                 return mnstr_flush(s->inner, flush_level);
     145             : }
     146             : 
     147             : 
     148             : static void
     149         436 : pump_close(stream *s)
     150             : {
     151         436 :         pump_state *state = (pump_state*) s->stream_data.p;
     152         436 :         inner_state_t *inner_state = state->inner_state;
     153             : 
     154         436 :         if (!s->readonly) {
     155          10 :                 state->set_src_win(inner_state, (pump_buffer){ .start = NULL, .count = 0 });
     156          10 :                 pump_out(s, PUMP_FINISH);
     157             :         }
     158         436 :         mnstr_close(s->inner);
     159         436 : }
     160             : 
     161             : 
     162             : static void
     163         436 : pump_destroy(stream *s)
     164             : {
     165         436 :         pump_state *state = (pump_state*) s->stream_data.p;
     166         436 :         inner_state_t *inner_state = state->inner_state;
     167             : 
     168         436 :         state->finalizer(inner_state);
     169         436 :         free(state);
     170         436 :         mnstr_destroy(s->inner);
     171         436 :         destroy_stream(s);
     172         436 : }
     173             : 
     174             : static pump_result
     175      148659 : pump_in(stream *s)
     176             : {
     177      148659 :         pump_state *state = (pump_state *) s->stream_data.p;
     178      148659 :         inner_state_t *inner_state = state->inner_state;
     179             : 
     180      148659 :         char *before = state->get_dst_win(inner_state).start;
     181             :         (void) before; // nice while in the debugger
     182             : 
     183      148659 :         pump_buffer buffer = state->get_buffer(inner_state);
     184      183031 :         while (1) {
     185      331690 :                 pump_buffer dst = state->get_dst_win(inner_state);
     186      331690 :                 pump_buffer src = state->get_src_win(inner_state);
     187             : 
     188      331690 :                 if (dst.count == 0)
     189             :                         // Output buffer is full, we're done.
     190      148659 :                         return PUMP_OK;
     191             : 
     192             :                 // Handle input, if possible and necessary
     193      183692 :                 if (src.start != NULL && src.count == 0) {
     194             :                         // start != NULL means we haven't encountered EOF yet
     195             : 
     196       35466 :                         ssize_t nread = mnstr_read(s->inner, buffer.start, 1, buffer.count);
     197             : 
     198       35466 :                         if (nread < 0)
     199             :                                 // Error. Return directly, discarding any data lingering
     200             :                                 // in the internal state.
     201             :                                 return PUMP_ERROR;
     202       35466 :                         if (nread == 0) {
     203             :                                 // Set to NULL so we'll remember next time.
     204             :                                 // Maybe there is some data in the internal state we don't
     205             :                                 // return immediately.
     206             :                                 src = (pump_buffer){.start=NULL, .count=0};
     207         414 :                                 s->eof |= s->inner->eof;
     208             :                         } else
     209             :                                 // All good
     210       35052 :                                 src = (pump_buffer) { .start = buffer.start, .count = nread};
     211             : 
     212       35466 :                         state->set_src_win(inner_state, src);
     213             :                 }
     214             : 
     215      183692 :                 pump_action action = (src.start != NULL) ? PUMP_NO_FLUSH : PUMP_FINISH;
     216             : 
     217             :                 // Try to make some progress
     218             :                 assert(dst.count > 0);
     219      183692 :                 assert(src.count > 0 || action == PUMP_FINISH);
     220      183692 :                 pump_result ret = state->worker(inner_state, action);
     221      183692 :                 if (ret == PUMP_ERROR)
     222             :                         return PUMP_ERROR;
     223             : 
     224      183692 :                 if (ret == PUMP_END)
     225             :                         // If you say so
     226             :                         return PUMP_END;
     227             : 
     228             :                 // If we get here we made some progress so we're ready for a new iteration.
     229             :         }
     230             : }
     231             : 
     232             : 
     233             : static pump_result
     234        2860 : pump_out(stream *s, pump_action action)
     235             : {
     236        2860 :         pump_state *state = (pump_state *) s->stream_data.p;
     237        2860 :         inner_state_t *inner_state = state->inner_state;
     238             : 
     239        2860 :         void *before = state->get_src_win(inner_state).start;
     240             :         (void) before; // nice while in the debugger
     241             : 
     242        2860 :         pump_buffer buffer = state->get_buffer(inner_state);
     243             : 
     244        3605 :         while (1) {
     245        6465 :                 pump_buffer dst = state->get_dst_win(inner_state);
     246        6465 :                 pump_buffer src = state->get_src_win(inner_state);
     247             : 
     248             :                 // Make sure there is room in the output buffer
     249        6465 :                 assert(state->elbow_room <= buffer.count);
     250        6465 :                 if (dst.count == 0 || dst.count < state->elbow_room) {
     251          49 :                         size_t amount = dst.start - buffer.start;
     252          49 :                         ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
     253          49 :                         if (nwritten != (ssize_t)amount)
     254        2860 :                                 return PUMP_ERROR;
     255             :                         dst = buffer;
     256          49 :                         state->set_dst_win(inner_state, dst); // reset output window
     257             :                 }
     258             : 
     259             :                 // Try to make progress
     260        6465 :                 pump_result ret = state->worker(inner_state, action);
     261        6465 :                 if (ret == PUMP_ERROR)
     262             :                         return PUMP_ERROR;
     263             : 
     264             :                 // src and dst have been invalidated by the call to worker
     265        6275 :                 dst = state->get_dst_win(inner_state);
     266        6275 :                 src = state->get_src_win(inner_state);
     267             : 
     268             :                 // There was no error but if input is still available, we definitely
     269             :                 // need another round
     270        6275 :                 if (src.count > 0)
     271        3411 :                         continue;
     272             : 
     273             :                 // Though the input data has been consumed, some of it might still
     274             :                 // linger in the internal state.
     275        2864 :                 if (action == PUMP_NO_FLUSH) {
     276             :                         // Let it linger, we'll combine it with the next batch
     277        2089 :                         assert(ret == PUMP_OK); // worker would never PUMP_END, would it?
     278             :                         return PUMP_OK;
     279             :                 }
     280             : 
     281             :                 // We are flushing or finishing or whatever.
     282             :                 // We may need to do more iterations to fully flush the internal state.
     283             :                 // Is there any internal state left?
     284         775 :                 if (ret == PUMP_OK)
     285             :                         // yes, there is
     286         194 :                         continue;
     287             : 
     288             :                 // All internal state has been drained.
     289             :                 // Now drain the output buffer
     290         581 :                 assert(ret == PUMP_END);
     291         581 :                 size_t amount = dst.start - buffer.start;
     292         581 :                 if (amount > 0) {
     293         580 :                         ssize_t nwritten = mnstr_write(s->inner, buffer.start, 1, amount);
     294         580 :                         if (nwritten != (ssize_t)amount)
     295             :                                 return PUMP_ERROR;
     296             :                 }
     297         581 :                 state->set_dst_win(inner_state, buffer); // reset output window
     298         581 :                 return PUMP_END;
     299             :         }
     300             : 
     301             : 
     302             : }

Generated by: LCOV version 1.14