1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
|
//------------------------------------------------------------------------------
// GB_serialize_array: serialize an array, with optional compression
//------------------------------------------------------------------------------
// SuiteSparse:GraphBLAS, Timothy A. Davis, (c) 2017-2022, All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
//------------------------------------------------------------------------------
// Parallel compression method for an array. The array is compressed into
// a sequence of independently allocated blocks, or returned as-is if not
// compressed. Currently, only LZ4, LZ4HC, and ZSTD are supported.
#include "GB.h"
#include "GB_serialize.h"
#include "GB_lz4.h"
#include "GB_zstd.h"
#define GB_FREE_ALL \
{ \
GB_FREE (&Sblocks, Sblocks_size) ; \
GB_serialize_free_blocks (&Blocks, Blocks_size, nblocks, Context) ; \
}
GrB_Info GB_serialize_array
(
// output:
GB_blocks **Blocks_handle, // Blocks: array of size nblocks+1
size_t *Blocks_size_handle, // size of Blocks
int64_t **Sblocks_handle, // Sblocks: array of size nblocks+1
size_t *Sblocks_size_handle, // size of Sblocks
int32_t *nblocks_handle, // # of blocks
int32_t *method_used, // method used
size_t *compressed_size, // size of compressed block, or upper
// bound if dryrun is true
// input:
bool dryrun, // if true, just esimate the size
GB_void *X, // input array of size len
int64_t len, // size of X, in bytes
int32_t method, // compression method requested
int32_t algo, // compression algorithm
int32_t level, // compression level
GB_Context Context
)
{
//--------------------------------------------------------------------------
// check inputs
//--------------------------------------------------------------------------
ASSERT (Blocks_handle != NULL) ;
ASSERT (Blocks_size_handle != NULL) ;
ASSERT (Sblocks_handle != NULL) ;
ASSERT (Sblocks_size_handle != NULL) ;
ASSERT (nblocks_handle != NULL) ;
ASSERT (method_used != NULL) ;
ASSERT (compressed_size != NULL) ;
GB_blocks *Blocks = NULL ;
size_t Blocks_size = 0, Sblocks_size = 0 ;
int32_t nblocks = 0 ;
int64_t *Sblocks = NULL ;
//--------------------------------------------------------------------------
// check for quick return
//--------------------------------------------------------------------------
(*Blocks_handle) = NULL ;
(*Blocks_size_handle) = 0 ;
(*Sblocks_handle) = NULL ;
(*Sblocks_size_handle) = 0 ;
(*nblocks_handle) = 0 ;
(*method_used) = GxB_COMPRESSION_NONE ;
(*compressed_size) = 0 ;
if (X == NULL || len == 0)
{
// input array is empty
return (GrB_SUCCESS) ;
}
//--------------------------------------------------------------------------
// check for no compression
//--------------------------------------------------------------------------
if (method <= GxB_COMPRESSION_NONE || len < 256)
{
// no compression, return result as a single block (plus the sentinel)
if (!dryrun)
{
Blocks = GB_MALLOC (2, GB_blocks, &Blocks_size) ;
Sblocks = GB_MALLOC (2, int64_t, &Sblocks_size) ;
if (Blocks == NULL || Sblocks == NULL)
{
// out of memory
GB_FREE_ALL ;
return (GrB_OUT_OF_MEMORY) ;
}
Blocks [0].p = X ; // first block is all of the array X
Blocks [0].p_size_allocated = 0 ; // p is shallow
Sblocks [0] = 0 ; // start of first block
Blocks [1].p = NULL ; // 2nd block is the final sentinel
Blocks [1].p_size_allocated = 0 ; // p is shallow
Sblocks [1] = len ; // first block ends at len-1
(*Blocks_handle) = Blocks ;
(*Blocks_size_handle) = Blocks_size ;
(*Sblocks_handle) = Sblocks ;
(*Sblocks_size_handle) = Sblocks_size ;
}
(*compressed_size) = len ;
(*nblocks_handle) = 1 ;
return (GrB_SUCCESS) ;
}
(*method_used) = method ;
//--------------------------------------------------------------------------
// determine # of threads to use
//--------------------------------------------------------------------------
GB_GET_NTHREADS_MAX (nthreads_max, chunk, Context) ;
int nthreads = GB_nthreads (len, chunk, nthreads_max) ;
//--------------------------------------------------------------------------
// determine # of blocks and allocate them
//--------------------------------------------------------------------------
// divide the array into blocks, 4 per thread, or a single block if 1 thread
int64_t blocksize = (nthreads == 1) ? len : GB_ICEIL (len, 4*nthreads) ;
// ensure the blocksize does not exceed the LZ4 maximum
// ... this is also fine for ZSTD
ASSERT (LZ4_MAX_INPUT_SIZE < INT32_MAX) ;
blocksize = GB_IMIN (blocksize, LZ4_MAX_INPUT_SIZE/2) ;
// ensure the blocksize is not too small
blocksize = GB_IMAX (blocksize, (64*1024)) ;
// determine the final # of blocks
nblocks = GB_ICEIL (len, blocksize) ;
nthreads = GB_IMIN (nthreads, nblocks) ;
(*nblocks_handle) = nblocks ;
// allocate the output Blocks: one per block plus the sentinel block
if (!dryrun)
{
Blocks = GB_CALLOC (nblocks+1, GB_blocks, &Blocks_size) ;
Sblocks = GB_CALLOC (nblocks+1, int64_t, &Sblocks_size) ;
if (Blocks == NULL || Sblocks == NULL)
{
// out of memory
GB_FREE_ALL ;
return (GrB_OUT_OF_MEMORY) ;
}
}
// allocate the blocks, one at a time
int32_t blockid ;
bool ok = true ;
for (blockid = 0 ; blockid < nblocks && ok ; blockid++)
{
// allocate a single block for the compression of X [kstart:kend-1]
int64_t kstart, kend ;
GB_PARTITION (kstart, kend, len, blockid, nblocks) ;
size_t uncompressed = kend - kstart ;
ASSERT (uncompressed < INT32_MAX) ;
ASSERT (uncompressed > 0) ;
size_t s ;
switch (algo)
{
case GxB_COMPRESSION_LZ4 :
case GxB_COMPRESSION_LZ4HC :
s = (size_t) LZ4_compressBound ((int) uncompressed) ;
break ;
default :
case GxB_COMPRESSION_ZSTD :
s = ZSTD_compressBound (uncompressed) ;
break ;
}
ASSERT (s < INT32_MAX) ;
if (dryrun)
{
// do not allocate the block; just sum up the upper bound sizes
(*compressed_size) += s ;
}
else
{
// allocate the block
size_t size_allocated = 0 ;
GB_void *p = GB_MALLOC (s, GB_void, &size_allocated) ;
ok = (p != NULL) ;
Blocks [blockid].p = p ;
Blocks [blockid].p_size_allocated = size_allocated ;
}
}
if (dryrun)
{
// GrB_Matrix_serializeSize: no more work to do. (*compressed_size) is
// an upper bound of the blob_size required when the matrix is
// compressed, and (*nblocks_handle) is the number of blocks to be used.
// No space has been allocated.
return (GrB_SUCCESS) ;
}
if (!ok)
{
// out of memory
GB_FREE_ALL ;
return (GrB_OUT_OF_MEMORY) ;
}
//--------------------------------------------------------------------------
// compress the blocks in parallel
//--------------------------------------------------------------------------
#pragma omp parallel for num_threads(nthreads) schedule(dynamic) \
reduction(&&:ok)
for (blockid = 0 ; blockid < nblocks ; blockid++)
{
// compress X [kstart:kend-1] into Blocks [blockid].p
int64_t kstart, kend ;
GB_PARTITION (kstart, kend, len, blockid, nblocks) ;
const char *src = (const char *) (X + kstart) ; // source
char *dst = (char *) Blocks [blockid].p ; // destination
int srcSize = (int) (kend - kstart) ; // size of source
size_t dsize = Blocks [blockid].p_size_allocated ; // size of dest
int dstCapacity = GB_IMIN (dsize, INT32_MAX) ;
int s ;
size_t s64 ;
switch (algo)
{
case GxB_COMPRESSION_LZ4 :
s = LZ4_compress_default (src, dst, srcSize, dstCapacity) ;
ok = ok && (s > 0) ;
// compressed block is now in dst [0:s-1], of size s
Sblocks [blockid] = (int64_t) s ;
break ;
case GxB_COMPRESSION_LZ4HC :
s = LZ4_compress_HC (src, dst, srcSize, dstCapacity, level) ;
ok = ok && (s > 0) ;
// compressed block is now in dst [0:s-1], of size s
Sblocks [blockid] = (int64_t) s ;
break ;
default :
case GxB_COMPRESSION_ZSTD :
s64 = ZSTD_compress (dst, dstCapacity, src, srcSize, level) ;
ok = ok && (s64 <= dstCapacity) ;
// compressed block is now in dst [0:s64-1], of size s64
Sblocks [blockid] = (int64_t) s64 ;
break ;
}
}
if (!ok)
{
// compression failure: this can "never" occur
GB_FREE_ALL ;
return (GrB_INVALID_OBJECT) ;
}
//--------------------------------------------------------------------------
// compute cumulative sum of the compressed blocks
//--------------------------------------------------------------------------
GB_cumsum (Sblocks, nblocks, NULL, 1, Context) ;
//--------------------------------------------------------------------------
// free workspace return result
//--------------------------------------------------------------------------
(*Blocks_handle) = Blocks ;
(*Blocks_size_handle) = Blocks_size ;
(*Sblocks_handle) = Sblocks ;
(*Sblocks_size_handle) = Sblocks_size ;
(*compressed_size) = Sblocks [nblocks] ; // actual size of the blob
return (GrB_SUCCESS) ;
}
|