File: srl_compress.h

package info (click to toggle)
libsereal-encoder-perl 4.005%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,980 kB
  • sloc: ansic: 8,664; perl: 5,705; sh: 25; makefile: 5
file content (243 lines) | stat: -rw-r--r-- 9,554 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#ifndef SRL_COMPRESS_H_
#define SRL_COMPRESS_H_

#include "srl_buffer.h"
#include "srl_inline.h"
#include "srl_protocol.h"
#include "srl_buffer_types.h"

/* WARNING: This is different from the protocol bit SRL_PROTOCOL_ENCODING_SNAPPY
 *          and SRL_PROTOCOL_ENCODING_ZLIB in that it's a flag indicating that
 *          we want to use Snappy or Zlib.
 *
 * DO NOT CHANGE THIS WITHOUT REVIEWING THE BITS IN srl_encoder.h and etc.
 */

#define SRL_F_COMPRESS_SNAPPY                   0x00040UL
#define SRL_F_COMPRESS_SNAPPY_INCREMENTAL       0x00080UL
#define SRL_F_COMPRESS_ZLIB                     0x00100UL
#define SRL_F_COMPRESS_ZSTD                     0x40000UL
/* WARNING: IF ADDING NEW COMPRESSION MAKE SURE THAT NEW CONSTANT DOES NOT
 *          COLLIDE WITH CONSTANTS IN srl_encoder.h!
 */

#define SRL_F_COMPRESS_FLAGS_MASK               (SRL_F_COMPRESS_SNAPPY | \
                                                 SRL_F_COMPRESS_SNAPPY_INCREMENTAL | \
                                                 SRL_F_COMPRESS_ZLIB | \
                                                 SRL_F_COMPRESS_ZSTD)

#if defined(HAVE_CSNAPPY)
#include <csnappy.h>
#else
#include "snappy/csnappy_compress.c"
#endif

#if defined(HAVE_MINIZ)
#include <miniz.h>
#else
#include "miniz.h"
#endif

#if defined(HAVE_ZSTD)
#include <zstd.h>
#else
#include "zstd/zstd.h"
#endif

/* Update a varint anywhere in the output stream with defined start and end
 * positions. This can produce non-canonical varints and is useful for filling
 * pre-allocated varints. */
SRL_STATIC_INLINE void
srl_update_varint_from_to(pTHX_ unsigned char *varint_start, unsigned char *varint_end, UV number)
{
    while (number >= 0x80) {                      /* while we are larger than 7 bits long */
        *varint_start++ = (number & 0x7f) | 0x80; /* write out the least significant 7 bits, set the high bit */
        number = number >> 7;                     /* shift off the 7 least significant bits */
    }

    /* if it is the same size we can use a canonical varint */
    if ( varint_start == varint_end ) {
        *varint_start = number;                   /* encode the last 7 bits without the high bit being set */
    } else {
        /* if not we produce a non-canonical varint, basically we stuff
         * 0 bits (via 0x80) into the "tail" of the varint, until we can
         * stick in a null to terminate the sequence. This means that the
         * varint is effectively "self-padding", and we only need special
         * logic in the encoder - a decoder will happily process a non-canonical
         * varint with no problem */
        *varint_start++ = (number & 0x7f) | 0x80;
        while ( varint_start < varint_end )
            *varint_start++ = 0x80;
        *varint_start= 0;
    }
}

/* Lazy working buffer alloc */
SRL_STATIC_INLINE void
srl_init_snappy_workmem(pTHX_ void **workmem)
{
    /* Lazy working buffer alloc */
    if (expect_false(*workmem == NULL)) {
        /* Cleaned up automatically by the cleanup handler */
        Newx(*workmem, CSNAPPY_WORKMEM_BYTES, char);
        if (*workmem == NULL)
            croak("Out of memory!");
    }
}

/* Destroy working buffer */
SRL_STATIC_INLINE void
srl_destroy_snappy_workmem(pTHX_ void *workmem)
{
    Safefree(workmem);
}

SRL_STATIC_INLINE U8
srl_get_compression_header_flag(const U32 compress_flags)
{
    if (compress_flags & SRL_F_COMPRESS_SNAPPY) {
        return SRL_PROTOCOL_ENCODING_SNAPPY;
    } else if (compress_flags & SRL_F_COMPRESS_SNAPPY_INCREMENTAL) {
        return SRL_PROTOCOL_ENCODING_SNAPPY_INCREMENTAL;
    } else if (compress_flags & SRL_F_COMPRESS_ZLIB) {
        return SRL_PROTOCOL_ENCODING_ZLIB;
    } else if (compress_flags & SRL_F_COMPRESS_ZSTD) {
        return SRL_PROTOCOL_ENCODING_ZSTD;
    } else {
        return SRL_PROTOCOL_ENCODING_RAW;
    }
}

/* Sets the compression header flag */
SRL_STATIC_INLINE void
srl_set_compression_header_flag(srl_buffer_t *buf, const U32 compress_flags)
{
    /* sizeof(const char *) includes a count of \0 */
    srl_buffer_char *flags_and_version_byte = buf->start + sizeof(SRL_MAGIC_STRING) - 1;
    *flags_and_version_byte |= srl_get_compression_header_flag(compress_flags);
}

/* Resets the compression header flag to OFF.
 * Obviously requires that a Sereal header was already written to the
 * encoder's output buffer. */
SRL_STATIC_INLINE void
srl_reset_compression_header_flag(srl_buffer_t *buf)
{
    /* sizeof(const char *) includes a count of \0 */
    srl_buffer_char *flags_and_version_byte = buf->start + sizeof(SRL_MAGIC_STRING) - 1;

    /* disable snappy flag in header */
    *flags_and_version_byte = SRL_PROTOCOL_ENCODING_RAW |
                              (*flags_and_version_byte & SRL_PROTOCOL_VERSION_MASK);
}

/* Compress body with one of available compressors (zlib, snappy).
 * The function sets/resets compression bits at version byte.
 * The caller has to adjust buf->body_pos by calling SRL_UPDATE_BODY_POS
 * right after exiting from srl_compress_body.
 */

SRL_STATIC_INLINE void
srl_compress_body(pTHX_ srl_buffer_t *buf, STRLEN sereal_header_length,
                  const U32 compress_flags, const int compress_level, void **workmem)
{
    const int is_traditional_snappy = compress_flags & SRL_F_COMPRESS_SNAPPY;
    const int is_incremental_snappy = compress_flags & SRL_F_COMPRESS_SNAPPY_INCREMENTAL;
    const int is_zstd = compress_flags & SRL_F_COMPRESS_ZSTD;
    const int is_zlib = !is_traditional_snappy && !is_incremental_snappy && !is_zstd;

    size_t uncompressed_body_length = BUF_POS_OFS(buf) - sereal_header_length;
    size_t compressed_body_length;
    srl_buffer_char *varint_start = NULL;
    srl_buffer_char *varint_end = NULL;
    srl_buffer_t old_buf;

    DEBUG_ASSERT_BUF_SANE(buf);

    /* Get estimated compressed payload length */
    if (is_incremental_snappy) {
        compressed_body_length = (size_t) csnappy_max_compressed_length(uncompressed_body_length);
        compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
    } else if (is_traditional_snappy) {
        compressed_body_length = (size_t) csnappy_max_compressed_length(uncompressed_body_length);
    } else if (is_zstd) {
        compressed_body_length = ZSTD_compressBound(uncompressed_body_length);
        compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
    } else {
        compressed_body_length = (size_t) mz_compressBound(uncompressed_body_length);
        compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed uncommpressed packet length as varint */
        compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
    }

    /* Back up old buffer and allocate new one with correct size */
    srl_buf_copy_buffer(aTHX_ buf, &old_buf);
    srl_buf_init_buffer(aTHX_ buf, sereal_header_length + compressed_body_length + 1);

    /* Copy Sereal header */
    Copy(old_buf.start, buf->pos, sereal_header_length, char);
    buf->pos += sereal_header_length;

    /* Embed uncompressed packet length if Zlib */
    if (is_zlib) srl_buf_cat_varint_nocheck(aTHX_ buf, 0, uncompressed_body_length);

    /* Embed compressed packet length if incr. Snappy, Zlib or Zstd*/
    if (is_incremental_snappy || is_zlib || is_zstd) {
        varint_start = buf->pos;
        srl_buf_cat_varint_nocheck(aTHX_ buf, 0, compressed_body_length);
        varint_end = buf->pos - 1;
    }

    if (is_incremental_snappy || is_traditional_snappy) {
        uint32_t len = (uint32_t) compressed_body_length;
        srl_init_snappy_workmem(aTHX_ workmem);

        csnappy_compress((char*) old_buf.start + sereal_header_length, (uint32_t) uncompressed_body_length,
                         (char*) buf->pos, &len, *workmem, CSNAPPY_WORKMEM_BYTES_POWER_OF_TWO);

        compressed_body_length = (size_t) len;
    } else if (is_zstd) {
        size_t code = ZSTD_compress((void*) buf->pos, compressed_body_length,
                                    (void*) old_buf.start + sereal_header_length, uncompressed_body_length,
                                    compress_level);

        assert(ZSTD_isError(code) == 0);
        compressed_body_length = code;
    } else if (is_zlib) {
        mz_ulong dl = (mz_ulong) compressed_body_length;
        int status = mz_compress2(
            buf->pos,
            &dl,
            old_buf.start + sereal_header_length,
            (mz_ulong) uncompressed_body_length,
            compress_level
        );

        (void)status;
        assert(status == Z_OK);
        compressed_body_length = (size_t) dl;
    }

    assert(compressed_body_length != 0);

    /* If compression didn't help, swap back to old, uncompressed buffer */
    if (compressed_body_length >= uncompressed_body_length) {
        /* swap in old, uncompressed buffer */
        srl_buf_swap_buffer(aTHX_ buf, &old_buf);
        /* disable compression flag */
        srl_reset_compression_header_flag(buf);
    } else { /* go ahead with Snappy and do final fixups */
        /* overwrite the max size varint with the real size of the compressed data */
        if (varint_start)
            srl_update_varint_from_to(aTHX_ varint_start, varint_end, compressed_body_length);

        buf->pos += compressed_body_length;

        /* enable compression flag */
        srl_set_compression_header_flag(buf, compress_flags);
    }

    srl_buf_free_buffer(aTHX_ &old_buf);
    DEBUG_ASSERT_BUF_SANE(buf);
}

#endif