1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
|
// Copyright (c) 2012-2014 Konstantin Isakov <ikm@zbackup.org>
// Part of ZBackup. Licensed under GNU GPLv2 or later + OpenSSL, see LICENSE
#include <openssl/sha.h>
#include <string.h>
#include "backup_creator.hh"
#include "check.hh"
#include "debug.hh"
#include "message.hh"
#include "page_size.hh"
#include "static_assert.hh"
namespace {
unsigned const MinChunkSize = 256;
}
BackupCreator::BackupCreator( StorageInfo const & info,
ChunkIndex & chunkIndex,
ChunkStorage::Writer & chunkStorageWriter ):
chunkMaxSize( info.chunk_max_size() ),
chunkIndex( chunkIndex ), chunkStorageWriter( chunkStorageWriter ),
ringBufferFill( 0 ),
chunkToSaveFill( 0 ),
backupDataStream( new google::protobuf::io::StringOutputStream( &backupData ) ),
chunkIdGenerated( false )
{
// In our ring buffer we have enough space to store one chunk plus an extra
// page for buffering the input
ringBuffer.resize( chunkMaxSize + getPageSize() );
begin = ringBuffer.data();
end = &ringBuffer.back() + 1;
head = begin;
tail = head;
chunkToSave.resize( chunkMaxSize );
}
void * BackupCreator::getInputBuffer()
{
return head;
}
size_t BackupCreator::getInputBufferSize()
{
if ( tail > head )
return tail - head;
else
if ( tail == head && ringBufferFill )
return 0;
else
return end - head;
}
void BackupCreator::handleMoreData( unsigned added )
{
// Note: head is never supposed to wrap around in the middle of the operation,
// as getInputBufferSize() never returns a value which could result in a
// wrap-around
while( added )
{
// If we don't have a full chunk, we need to consume data until we have
// one
if ( ringBufferFill < chunkMaxSize )
{
unsigned left = chunkMaxSize - ringBufferFill;
bool canFullyFill = added >= left;
unsigned toFill = canFullyFill ? left : added;
added -= toFill;
ringBufferFill += toFill;
while ( toFill-- )
rollingHash.rollIn( *head++ );
if ( head == end )
head = begin;
// If we've managed to fill in the complete chunk, attempt matching it
if ( canFullyFill )
addChunkIfMatched();
}
else
{
// At this point we have a full chunk in the ring buffer, so we can rotate
// over a byte
chunkToSave[ chunkToSaveFill++ ] = *tail;
if ( chunkToSaveFill == chunkMaxSize )
// Got the full chunk - save it
saveChunkToSave();
rollingHash.rotate( *head++, *tail++ );
if ( head == end )
head = begin;
if ( tail == end )
tail = begin;
addChunkIfMatched();
--added; // A byte was consumed
}
}
}
void BackupCreator::saveChunkToSave()
{
CHECK( chunkToSaveFill > 0, "chunk to save is empty" );
if ( chunkToSaveFill < 128 ) // TODO: make this value configurable
{
// The amount of data is too small - emit without creating a new chunk
BackupInstruction instr;
instr.set_bytes_to_emit( chunkToSave.data(), chunkToSaveFill );
outputInstruction( instr );
}
else
{
// Output as a chunk
ChunkId id;
id.rollingHash = RollingHash::digest( chunkToSave.data(),
chunkToSaveFill );
unsigned char sha1Value[ SHA_DIGEST_LENGTH ];
SHA1( (unsigned char const *) chunkToSave.data(), chunkToSaveFill,
sha1Value );
STATIC_ASSERT( sizeof( id.cryptoHash ) <= sizeof( sha1Value ) );
memcpy( id.cryptoHash, sha1Value, sizeof( id.cryptoHash ) );
// Save it to the store if it's not there already
chunkStorageWriter.add( id, chunkToSave.data(), chunkToSaveFill );
BackupInstruction instr;
instr.set_chunk_to_emit( id.toBlob() );
outputInstruction( instr );
}
chunkToSaveFill = 0;
}
void BackupCreator::finish()
{
dPrintf( "At finish: %u, %u\n", chunkToSaveFill, ringBufferFill );
// At this point we may have some bytes in chunkToSave, and some in the ring
// buffer. We need to save both
if ( chunkToSaveFill + ringBufferFill > chunkMaxSize )
{
// We have more than a full chunk in chunkToSave and ringBuffer together, so
// save the first part as a full chunk first
// Move data from ring buffer to have full chunk in chunkToSave.
moveFromRingBufferToChunkToSave( chunkMaxSize - chunkToSaveFill );
saveChunkToSave();
}
// Concatenate the rest of data and save it too
CHECK( chunkToSaveFill + ringBufferFill <= chunkMaxSize, "had more than two "
"full chunks at backup finish" );
moveFromRingBufferToChunkToSave( ringBufferFill );
if ( chunkToSaveFill )
saveChunkToSave();
}
void BackupCreator::moveFromRingBufferToChunkToSave( unsigned toMove )
{
// If tail is before head, all data in the ring buffer is in one contiguous
// piece. If not, it's in two pieces
if ( tail < head )
{
memcpy( chunkToSave.data() + chunkToSaveFill, tail, toMove );
tail += toMove;
}
else
{
unsigned toEnd = end - tail;
unsigned firstPart = toEnd < toMove ? toEnd : toMove;
memcpy( chunkToSave.data() + chunkToSaveFill, tail, firstPart );
tail += firstPart;
if ( toMove > firstPart )
{
unsigned secondPart = toMove - firstPart;
memcpy( chunkToSave.data() + chunkToSaveFill + firstPart, begin,
secondPart );
tail = begin + secondPart;
}
}
if ( tail == end )
tail = begin;
chunkToSaveFill += toMove;
ringBufferFill -= toMove;
}
ChunkId const & BackupCreator::getChunkId()
{
if ( !chunkIdGenerated )
{
// Calculate SHA1
SHA_CTX ctx;
SHA1_Init( &ctx );
if ( tail < head )
{
// Tail is before head - all the block is in one contiguous piece
SHA1_Update( &ctx, tail, head - tail );
}
else
{
// Tail is after head - the block consists of two pieces
SHA1_Update( &ctx, tail, end - tail );
SHA1_Update( &ctx, begin, head - begin );
}
unsigned char sha1Value[ SHA_DIGEST_LENGTH ];
SHA1_Final( sha1Value, &ctx );
generatedChunkId.rollingHash = rollingHash.digest();
memcpy( generatedChunkId.cryptoHash, sha1Value,
sizeof( generatedChunkId.cryptoHash ) );
chunkIdGenerated = true;
}
return generatedChunkId;
}
void BackupCreator::addChunkIfMatched()
{
chunkIdGenerated = false;
if ( chunkIndex.findChunk( rollingHash.digest(), *this ) )
{
// verbosePrintf( "Reuse of chunk %lu\n", rollingHash.digest() );
// Before emitting the matched chunk, we need to make sure any bytes
// which came before it are saved first
if ( chunkToSaveFill )
saveChunkToSave();
// Add the record
BackupInstruction instr;
instr.set_chunk_to_emit( getChunkId().toBlob() );
outputInstruction( instr );
// The block was consumed from the ring buffer - remove the block from it
tail = head;
ringBufferFill = 0;
rollingHash.reset();
}
}
void BackupCreator::outputInstruction( BackupInstruction const & instr )
{
// TODO: once backupData becomes large enough, spawn another BackupCreator and
// feed data to it. This way we wouldn't have to store the entire backupData
// in RAM
Message::serialize( instr, *backupDataStream );
}
void BackupCreator::getBackupData( string & str )
{
CHECK( backupDataStream.get(), "getBackupData() called twice" );
backupDataStream.reset();
str.swap( backupData );
}
|