1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
|
// License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>
// First create a patcher with:
// p = NewPatcher()
// Create a signature for the file you want to update using
// p.CreateSignatureIterator(file_to_update)
// Now create a Differ with the created signature
// d = NewDiffer()
// d.AddSignatureData(signature_data_from_previous_step)
// Now create a delta based on the signature and the reference file
// d.CreateDelta(reference_file)
// Finally, apply this delta using the patcher to produce a file identical to reference_file
// based ont he delta data and file_to_update
// p.StartDelta(output_file, file_to_update)
// p.UpdateDelta(...)
// p.FinishDelta()
package rsync
import (
"fmt"
"io"
"math"
"github.com/kovidgoyal/kitty/tools/utils"
)
var _ = fmt.Print
const MaxBlockSize int = 1024 * 1024 // sqrt of 1TB
type StrongHashType uint16
type WeakHashType uint16
type ChecksumType uint16
const (
XXH3 StrongHashType = iota
)
const (
XXH3128Sum ChecksumType = iota
)
const (
Rsync WeakHashType = iota
)
type GrowBufferFunction = func(slice []byte, sz int) []byte
type Api struct {
rsync rsync
signature []BlockHash
Checksum_type ChecksumType
Strong_hash_type StrongHashType
Weak_hash_type WeakHashType
}
type Differ struct {
Api
unconsumed_signature_data []byte
}
type Patcher struct {
Api
unconsumed_delta_data []byte
expected_input_size_for_signature_generation int64
delta_output io.Writer
delta_input io.ReadSeeker
total_data_in_delta int
}
// internal implementation {{{
func (self *Api) read_signature_header(data []byte) (consumed int, err error) {
if len(data) < 12 {
return -1, io.ErrShortBuffer
}
if version := bin.Uint16(data); version != 0 {
return consumed, fmt.Errorf("Invalid version in signature header: %d", version)
}
switch csum := ChecksumType(bin.Uint16(data[2:])); csum {
case XXH3128Sum:
self.Checksum_type = XXH3128Sum
self.rsync.SetChecksummer(new_xxh3_128)
default:
return consumed, fmt.Errorf("Invalid checksum_type in signature header: %d", csum)
}
switch strong_hash := StrongHashType(bin.Uint16(data[4:])); strong_hash {
case XXH3:
self.Strong_hash_type = strong_hash
self.rsync.SetHasher(new_xxh3_64)
default:
return consumed, fmt.Errorf("Invalid strong_hash in signature header: %d", strong_hash)
}
switch weak_hash := WeakHashType(bin.Uint16(data[6:])); weak_hash {
case Rsync:
self.Weak_hash_type = weak_hash
default:
return consumed, fmt.Errorf("Invalid weak_hash in signature header: %d", weak_hash)
}
block_size := int(bin.Uint32(data[8:]))
consumed = 12
if block_size == 0 {
return consumed, fmt.Errorf("rsync signature header has zero block size")
}
if block_size > MaxBlockSize {
return consumed, fmt.Errorf("rsync signature header has too large block size %d > %d", block_size, MaxBlockSize)
}
self.rsync.BlockSize = block_size
self.signature = make([]BlockHash, 0, 1024)
return
}
func (self *Api) read_signature_blocks(data []byte) (consumed int) {
block_hash_size := self.rsync.HashSize() + 12
for ; len(data) >= block_hash_size; data = data[block_hash_size:] {
bl := BlockHash{}
bl.Unserialize(data[:block_hash_size])
self.signature = append(self.signature, bl)
consumed += block_hash_size
}
return
}
func (self *Differ) FinishSignatureData() (err error) {
if len(self.unconsumed_signature_data) > 0 {
return fmt.Errorf("There were %d leftover bytes in the signature data", len(self.unconsumed_signature_data))
}
self.unconsumed_signature_data = nil
if !self.rsync.HasHasher() {
return fmt.Errorf("No header was found in the signature data")
}
return
}
func (self *Patcher) update_delta(data []byte) (consumed int, err error) {
op := Operation{}
for len(data) > 0 {
n, uerr := op.Unserialize(data)
if uerr == nil {
consumed += n
data = data[n:]
if err = self.rsync.ApplyDelta(self.delta_output, self.delta_input, op); err != nil {
return
}
if op.Type == OpData {
self.total_data_in_delta += len(op.Data)
}
} else {
if n < 0 {
return consumed, nil
}
return consumed, uerr
}
}
return
}
// }}}
// Start applying serialized delta
func (self *Patcher) StartDelta(delta_output io.Writer, delta_input io.ReadSeeker) {
self.delta_output = delta_output
self.delta_input = delta_input
self.total_data_in_delta = 0
self.unconsumed_delta_data = nil
}
// Apply a chunk of delta data
func (self *Patcher) UpdateDelta(data []byte) (err error) {
self.unconsumed_delta_data = append(self.unconsumed_delta_data, data...)
consumed, err := self.update_delta(self.unconsumed_delta_data)
if err != nil {
return err
}
self.unconsumed_delta_data = utils.ShiftLeft(self.unconsumed_delta_data, consumed)
return
}
// Finish applying delta data
func (self *Patcher) FinishDelta() (err error) {
if err = self.UpdateDelta([]byte{}); err != nil {
return err
}
if len(self.unconsumed_delta_data) > 0 {
return fmt.Errorf("There are %d leftover bytes in the delta", len(self.unconsumed_delta_data))
}
self.delta_input = nil
self.delta_output = nil
self.unconsumed_delta_data = nil
if !self.rsync.checksum_done {
return fmt.Errorf("The checksum was not received at the end of the delta data")
}
return
}
// Create a signature for the data source in src.
func (self *Patcher) CreateSignatureIterator(src io.Reader, output io.Writer) func() error {
var it func() (BlockHash, error)
finished := false
var b [BlockHashSize]byte
return func() error {
if finished {
return io.EOF
}
if it == nil { // write signature header
it = self.rsync.CreateSignatureIterator(src)
bin.PutUint16(b[:], 0)
bin.PutUint16(b[2:], uint16(self.Checksum_type))
bin.PutUint16(b[4:], uint16(self.Strong_hash_type))
bin.PutUint16(b[6:], uint16(self.Weak_hash_type))
bin.PutUint32(b[8:], uint32(self.rsync.BlockSize))
if _, err := output.Write(b[:12]); err != nil {
return err
}
}
bl, err := it()
switch err {
case io.EOF:
finished = true
return io.EOF
case nil:
bl.Serialize(b[:BlockHashSize])
_, err = output.Write(b[:BlockHashSize])
return err
default:
return err
}
}
}
// Create a serialized delta based on the previously loaded signature
func (self *Differ) CreateDelta(src io.Reader, output io.Writer) func() error {
if err := self.FinishSignatureData(); err != nil {
return func() error { return err }
}
if self.signature == nil {
return func() error {
return fmt.Errorf("Cannot call CreateDelta() before loading a signature")
}
}
return self.rsync.CreateDiff(src, self.signature, output)
}
func (self *Differ) BlockSize() int {
return self.rsync.BlockSize
}
// Add more external signature data
func (self *Differ) AddSignatureData(data []byte) (err error) {
self.unconsumed_signature_data = append(self.unconsumed_signature_data, data...)
if !self.rsync.HasHasher() {
consumed, err := self.read_signature_header(self.unconsumed_signature_data)
if err != nil {
if consumed < 0 {
return nil
}
return err
}
self.unconsumed_signature_data = utils.ShiftLeft(self.unconsumed_signature_data, consumed)
}
consumed := self.read_signature_blocks(self.unconsumed_signature_data)
self.unconsumed_signature_data = utils.ShiftLeft(self.unconsumed_signature_data, consumed)
return nil
}
// Use to calculate a delta based on a supplied signature, via AddSignatureData
func NewDiffer() *Differ {
return &Differ{}
}
// Use to create a signature and possibly apply a delta
func NewPatcher(expected_input_size int64) (ans *Patcher) {
bs := DefaultBlockSize
sz := max(0, expected_input_size)
if sz > 0 {
bs = int(math.Round(math.Sqrt(float64(sz))))
}
ans = &Patcher{}
ans.rsync.BlockSize = min(bs, MaxBlockSize)
ans.rsync.SetHasher(new_xxh3_64)
ans.rsync.SetChecksummer(new_xxh3_128)
if ans.rsync.HashBlockSize() > 0 && ans.rsync.HashBlockSize() < ans.rsync.BlockSize {
ans.rsync.BlockSize = (ans.rsync.BlockSize / ans.rsync.HashBlockSize()) * ans.rsync.HashBlockSize()
}
ans.expected_input_size_for_signature_generation = sz
return
}
|