LCOV - code coverage report
Current view: top level - common/stream - lz4_stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 112 165 67.9 %
Date: 2021-10-13 02:24:04 Functions: 12 18 66.7 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /* streams working on a lzma/xz-compressed disk file */
      10             : 
      11             : #include "monetdb_config.h"
      12             : #include "stream.h"
      13             : #include "stream_internal.h"
      14             : #include "pump.h"
      15             : 
      16             : 
      17             : #ifdef HAVE_LIBLZ4
      18             : 
      19             : #define READ_CHUNK (1024)
      20             : #define WRITE_CHUNK (1024)
      21             : 
      22             : struct inner_state {
      23             :         pump_buffer src_win;
      24             :         pump_buffer dst_win;
      25             :         pump_buffer buffer;
      26             :         union {
      27             :                 LZ4F_cctx *c;
      28             :                 LZ4F_dctx *d;
      29             :         } ctx;
      30             :         LZ4F_preferences_t compression_prefs;
      31             :         LZ4F_errorCode_t error_code;
      32             :         bool finished;
      33             : };
      34             : 
      35             : static pump_buffer
      36       14029 : get_src_win(inner_state_t *inner_state)
      37             : {
      38       14029 :         return inner_state->src_win;
      39             : }
      40             : 
      41             : static void
      42        1229 : set_src_win(inner_state_t *inner_state, pump_buffer buf)
      43             : {
      44        1229 :         inner_state->src_win = buf;
      45        1229 : }
      46             : 
      47             : static pump_buffer
      48       16865 : get_dst_win(inner_state_t *inner_state)
      49             : {
      50       16865 :         return inner_state->dst_win;
      51             : }
      52             : 
      53             : static void
      54        2264 : set_dst_win(inner_state_t *inner_state, pump_buffer buf)
      55             : {
      56        2264 :         inner_state->dst_win = buf;
      57        2264 : }
      58             : 
      59             : static pump_buffer
      60        2741 : get_buffer(inner_state_t *inner_state)
      61             : {
      62        2741 :         return inner_state->buffer;
      63             : }
      64             : 
      65             : static pump_result
      66        2546 : decomp(inner_state_t *inner_state, pump_action action)
      67             : {
      68             :         LZ4F_errorCode_t ret;
      69             : 
      70        2546 :         if (inner_state->src_win.count == 0 && action == PUMP_FINISH)
      71           4 :                 inner_state->finished = true;
      72        2546 :         if (inner_state->finished)
      73             :                 return PUMP_END;
      74             : 
      75        2542 :         LZ4F_decompressOptions_t opts = {0};
      76        2542 :         size_t nsrc = inner_state->src_win.count; // amount available
      77        2542 :         size_t ndst = inner_state->dst_win.count; // space available
      78        2542 :         ret = LZ4F_decompress(
      79             :                 inner_state->ctx.d,
      80        2542 :                 inner_state->dst_win.start, &ndst,
      81        2542 :                 inner_state->src_win.start, &nsrc,
      82             :                 &opts);
      83             :         // Now nsrc has become the amount consumed, ndst the amount produced!
      84        2542 :         inner_state->src_win.start += nsrc;
      85        2542 :         inner_state->src_win.count -= nsrc;
      86        2542 :         inner_state->dst_win.start += ndst;
      87        2542 :         inner_state->dst_win.count -= ndst;
      88             : 
      89        2542 :         if (LZ4F_isError(ret)) {
      90           0 :                 inner_state->error_code = ret;
      91           0 :                 return PUMP_ERROR;
      92             :         }
      93             :         return PUMP_OK;
      94             : }
      95             : 
      96             : static void
      97           2 : decomp_end(inner_state_t *inner_state)
      98             : {
      99           2 :         LZ4F_freeDecompressionContext(inner_state->ctx.d);
     100           2 :         free(inner_state->buffer.start);
     101           2 :         free(inner_state);
     102           2 : }
     103             : 
     104             : 
     105             : static pump_result
     106        4117 : compr(inner_state_t *inner_state, pump_action action)
     107             : {
     108        4117 :         LZ4F_compressOptions_t opts = {0};
     109             :         size_t consumed;
     110             :         LZ4F_errorCode_t produced;
     111             :         pump_result intended_result;
     112             : 
     113        4117 :         if (inner_state->finished)
     114             :                 return PUMP_END;
     115             : 
     116        4117 :         size_t chunk = inner_state->src_win.count;
     117             :         if (chunk > WRITE_CHUNK)
     118             :                 chunk = WRITE_CHUNK;
     119             : 
     120        4117 :         switch (action) {
     121             : 
     122        3925 :                 case PUMP_NO_FLUSH:
     123        3925 :                         produced = LZ4F_compressUpdate(
     124             :                                 inner_state->ctx.c,
     125        3925 :                                 inner_state->dst_win.start,
     126             :                                 inner_state->dst_win.count,
     127        3925 :                                 inner_state->src_win.start,
     128             :                                 chunk,
     129             :                                 &opts);
     130             :                         consumed = chunk;
     131             :                         intended_result = PUMP_OK;
     132        3925 :                         break;
     133             : 
     134         190 :                 case PUMP_FLUSH_ALL:
     135             :                 case PUMP_FLUSH_DATA:
     136             :                         // FLUSH_ALL not supported yet, just flush the data
     137         190 :                         produced = LZ4F_flush(
     138             :                                 inner_state->ctx.c,
     139         190 :                                 inner_state->dst_win.start,
     140             :                                 inner_state->dst_win.count,
     141             :                                 &opts);
     142             :                         consumed = 0;
     143             :                         intended_result = PUMP_END;
     144         190 :                         break;
     145             : 
     146           2 :                 case PUMP_FINISH:
     147           2 :                         produced = LZ4F_compressEnd(
     148             :                                 inner_state->ctx.c,
     149           2 :                                 inner_state->dst_win.start,
     150             :                                 inner_state->dst_win.count,
     151             :                                 &opts);
     152             :                         consumed = 0;
     153           2 :                         inner_state->finished = true;
     154             :                         intended_result = PUMP_END;
     155           2 :                         break;
     156             : 
     157             :                 default:
     158           0 :                         assert(0); // shut up, compiler!
     159             :                         return PUMP_ERROR;
     160             :         }
     161             : 
     162        4117 :         if (LZ4F_isError(produced)) {
     163           0 :                 inner_state->error_code = produced;
     164           0 :                 return PUMP_ERROR;
     165             :         }
     166             : 
     167        4117 :         inner_state->src_win.start += consumed;
     168        4117 :         inner_state->src_win.count -= consumed;
     169        4117 :         inner_state->dst_win.start += produced;
     170        4117 :         inner_state->dst_win.count -= produced;
     171             : 
     172        4117 :         return intended_result;
     173             : }
     174             : 
     175             : static void
     176           2 : compr_end(inner_state_t *inner_state)
     177             : {
     178           2 :         LZ4F_freeCompressionContext(inner_state->ctx.c);
     179           2 :         free(inner_state->buffer.start);
     180           2 :         free(inner_state);
     181           2 : }
     182             : 
     183             : static const char*
     184           0 : get_error(inner_state_t *inner_state)
     185             : {
     186           0 :         return LZ4F_getErrorName(inner_state->error_code);
     187             : }
     188             : 
     189             : static stream *
     190           2 : setup_decompression(stream *inner, pump_state *state)
     191             : {
     192           2 :         inner_state_t *inner_state = state->inner_state;
     193           2 :         void *buf = malloc(READ_CHUNK);
     194           2 :         if (buf == NULL)
     195             :                 return NULL;
     196             : 
     197           2 :         inner_state->buffer = (pump_buffer) { .start = buf, .count = READ_CHUNK };
     198           2 :         inner_state->src_win = inner_state->buffer;
     199           2 :         inner_state->src_win.count = 0;
     200             : 
     201           2 :         LZ4F_errorCode_t ret = LZ4F_createDecompressionContext(
     202             :                 &inner_state->ctx.d, LZ4F_VERSION);
     203           2 :         if (LZ4F_isError(ret)) {
     204           0 :                 free(buf);
     205           0 :                 mnstr_set_open_error(inner->name, 0, "failed to initialize lz4: %s", LZ4F_getErrorName(ret));
     206           0 :                 return NULL;
     207             :         }
     208             : 
     209           2 :         state->worker = decomp;
     210           2 :         state->finalizer = decomp_end;
     211             : 
     212           2 :         stream *s = pump_stream(inner, state);
     213           2 :         if (s == NULL) {
     214           0 :                 free(buf);
     215           0 :                 return NULL;
     216             :         }
     217             : 
     218             :         return s;
     219             : }
     220             : 
     221             : static stream *
     222           2 : setup_compression(stream *inner, pump_state *state, int level)
     223             : {
     224           2 :         inner_state_t *inner_state = state->inner_state;
     225             :         LZ4F_errorCode_t ret;
     226             : 
     227             :         // When pumping data into the compressor, the output buffer must be
     228             :         // sufficiently large to hold all output caused by the current input. We
     229             :         // will restrict our writes to be at most WRITE_CHUCK large and allocate
     230             :         // a buffer that can accomodate even the worst case amount of output
     231             :         // caused by input of that size.
     232             : 
     233             :         // The required size depends on the preferences so we set those first.
     234           2 :         memset(&inner_state->compression_prefs, 0, sizeof(inner_state->compression_prefs));
     235           2 :         inner_state->compression_prefs.compressionLevel = level;
     236             : 
     237             :         // Set up a buffer that can hold the largest output block plus the initial
     238             :         // header frame.
     239           2 :         size_t bound = LZ4F_compressBound(WRITE_CHUNK, &inner_state->compression_prefs);
     240           2 :         size_t bufsize = bound + LZ4F_HEADER_SIZE_MAX;
     241           2 :         char *buffer = malloc(bufsize);
     242           2 :         if (buffer == NULL)
     243             :                 return NULL;
     244           2 :         inner_state->buffer = (pump_buffer) { .start = buffer, .count = bufsize };
     245           2 :         inner_state->dst_win = inner_state->buffer;
     246           2 :         state->elbow_room = bound;
     247             : 
     248           2 :         ret = LZ4F_createCompressionContext(&inner_state->ctx.c, LZ4F_VERSION);
     249           2 :         if (LZ4F_isError(ret)) {
     250           0 :                 free(buffer);
     251           0 :                 return NULL;
     252             :         }
     253             : 
     254             :         // Write the header frame.
     255           2 :         size_t nwritten = LZ4F_compressBegin(
     256             :                 inner_state->ctx.c,
     257           2 :                 inner_state->dst_win.start,
     258             :                 inner_state->dst_win.count,
     259             :                 &inner_state->compression_prefs
     260             :         );
     261           2 :         if (LZ4F_isError(nwritten)) {
     262           0 :                 LZ4F_freeCompressionContext(inner_state->ctx.c);
     263           0 :                 free(buffer);
     264           0 :                 mnstr_set_open_error(inner->name, 0, "failed to initialize lz4: %s", LZ4F_getErrorName(ret));
     265           0 :                 return NULL;
     266             :         }
     267           2 :         inner_state->dst_win.start += nwritten;
     268           2 :         inner_state->dst_win.count -= nwritten;
     269             : 
     270           2 :         state->worker = compr;
     271           2 :         state->finalizer = compr_end;
     272             : 
     273           2 :         stream *s = pump_stream(inner, state);
     274           2 :         if (s == NULL) {
     275           0 :                 free(buffer);
     276           0 :                 return NULL;
     277             :         }
     278             : 
     279             :         return s;
     280             : }
     281             : 
     282             : stream *
     283           4 : lz4_stream(stream *inner, int level)
     284             : {
     285           4 :         inner_state_t *inner_state = calloc(1, sizeof(inner_state_t));
     286           4 :         pump_state *state = calloc(1, sizeof(pump_state));
     287           4 :         if (inner_state == NULL || state == NULL) {
     288           0 :                 free(inner_state);
     289           0 :                 free(state);
     290           0 :                 mnstr_set_open_error(inner->name, errno, "couldn't initialize lz4 stream");
     291           0 :                 return NULL;
     292             :         }
     293             : 
     294           4 :         state->inner_state = inner_state;
     295           4 :         state->get_src_win = get_src_win;
     296           4 :         state->set_src_win = set_src_win;
     297           4 :         state->get_dst_win = get_dst_win;
     298           4 :         state->set_dst_win = set_dst_win;
     299           4 :         state->get_buffer = get_buffer;
     300           4 :         state->get_error = get_error;
     301             : 
     302             :         stream *s;
     303           4 :         if (inner->readonly)
     304           2 :                 s = setup_decompression(inner, state);
     305             :         else
     306           2 :                 s = setup_compression(inner, state, level);
     307             : 
     308           4 :         if (s == NULL) {
     309           0 :                 free(inner_state);
     310           0 :                 free(state);
     311           0 :                 return NULL;
     312             :         }
     313             : 
     314             :         return s;
     315             : }
     316             : 
     317             : static stream *
     318           0 : open_lz4stream(const char *restrict filename, const char *restrict flags)
     319             : {
     320             :         stream *inner;
     321             :         int preset = 6;
     322             : 
     323           0 :         inner = open_stream(filename, flags);
     324           0 :         if (inner == NULL)
     325             :                 return NULL;
     326             : 
     327           0 :         return lz4_stream(inner, preset);
     328             : }
     329             : 
     330             : stream *
     331           0 : open_lz4rstream(const char *filename)
     332             : {
     333           0 :         stream *s = open_lz4stream(filename, "rb");
     334           0 :         if (s == NULL)
     335             :                 return NULL;
     336             : 
     337           0 :         assert(s->readonly == true);
     338           0 :         assert(s->binary == true);
     339             :         return s;
     340             : }
     341             : 
     342             : stream *
     343           0 : open_lz4wstream(const char *restrict filename, const char *restrict mode)
     344             : {
     345           0 :         stream *s = open_lz4stream(filename, mode);
     346           0 :         if (s == NULL)
     347             :                 return NULL;
     348             : 
     349           0 :         assert(s->readonly == false);
     350           0 :         assert(s->binary == true);
     351             :         return s;
     352             : }
     353             : 
     354             : stream *
     355           0 : open_lz4rastream(const char *filename)
     356             : {
     357           0 :         stream *s = open_lz4stream(filename, "r");
     358           0 :         s = create_text_stream(s);
     359           0 :         if (s == NULL)
     360             :                 return NULL;
     361             : 
     362           0 :         assert(s->readonly == true);
     363           0 :         assert(s->binary == false);
     364             :         return s;
     365             : }
     366             : 
     367             : stream *
     368           0 : open_lz4wastream(const char *restrict filename, const char *restrict mode)
     369             : {
     370           0 :         stream *s = open_lz4stream(filename, mode);
     371           0 :         s = create_text_stream(s);
     372           0 :         if (s == NULL)
     373             :                 return NULL;
     374           0 :         assert(s->readonly == false);
     375           0 :         assert(s->binary == false);
     376             :         return s;
     377             : }
     378             : #else
     379             : 
     380             : stream *
     381             : lz4_stream(stream *inner, int preset)
     382             : {
     383             :         (void) inner;
     384             :         (void) preset;
     385             :         mnstr_set_open_error(inner->name, 0, "LZ4 support has been left out of this MonetDB");
     386             :         return NULL;
     387             : }
     388             : 
     389             : stream *
     390             : open_lz4rstream(const char *filename)
     391             : {
     392             :         mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
     393             :         return NULL;
     394             : }
     395             : 
     396             : stream *
     397             : open_lz4wstream(const char *restrict filename, const char *restrict mode)
     398             : {
     399             :         (void) mode;
     400             :         mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
     401             :         return NULL;
     402             : }
     403             : 
     404             : stream *
     405             : open_lz4rastream(const char *filename)
     406             : {
     407             :         mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
     408             :         return NULL;
     409             : }
     410             : 
     411             : stream *
     412             : open_lz4wastream(const char *restrict filename, const char *restrict mode)
     413             : {
     414             :         (void) mode;
     415             :         mnstr_set_open_error(filename, 0, "LZ4 support has been left out of this MonetDB");
     416             :         return NULL;
     417             : }
     418             : 
     419             : #endif

Generated by: LCOV version 1.14