1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
#!/usr/bin/env python3
# use librsync module to do simple actions on files:
# - create a signature from a base file
# - create a delta between the signature and a new file
# - re-create the new file by patching the base file with the delta
# this can surely be used to analyse corrupt backups and similar things
import sys
from rdiff_backup import librsync
blocksize = 64 * 1024 # just used in copying
librsync_blocksize = None # If not set, use defaults in _librsync module
def usage():
"""Print usage and then exit"""
print(
"""
Usage: %(cmd)s "signature" basis_file signature_file
%(cmd)s "delta" signature_file new_file delta_file
%(cmd)s "patch" basis_file delta_file new_file
"""
% {"cmd": sys.argv[0]}
)
sys.exit(1)
def copy_and_close(infp, outfp):
"""Copy file streams infp to outfp in blocks, closing when done"""
while 1:
buf = infp.read(blocksize)
if not buf:
break
outfp.write(buf)
assert not infp.close() and not outfp.close()
def write_sig(input_path, output_path):
"""Open file at input_path, write signature to output_path"""
infp = open(input_path, "rb")
if librsync_blocksize:
sigfp = librsync.SigFile(infp, librsync_blocksize)
else:
sigfp = librsync.SigFile(infp)
copy_and_close(sigfp, open(output_path, "wb"))
def write_delta(sig_path, new_path, output_path):
"""Read signature and new file, write to delta to delta_path"""
deltafp = librsync.DeltaFile(open(sig_path, "rb"), open(new_path, "rb"))
copy_and_close(deltafp, open(output_path, "wb"))
def write_patch(basis_path, delta_path, out_path):
"""Patch file at basis_path with delta at delta_path, write to out_path"""
patchfp = librsync.PatchedFile(open(basis_path, "rb"), open(delta_path, "rb"))
copy_and_close(patchfp, open(out_path, "wb"))
def check_different(filelist):
"""Make sure no files the same"""
d = {}
for file in filelist:
d[file] = file
assert len(d) == len(filelist), "Error, must use all different filenames"
def Main():
"""Run program"""
if len(sys.argv) < 4:
usage()
mode = sys.argv[1]
file_args = sys.argv[2:]
check_different(file_args)
if mode == "signature":
if len(file_args) != 2:
usage()
write_sig(*file_args)
elif mode == "delta":
if len(file_args) != 3:
usage()
write_delta(*file_args)
elif mode == "patch":
if len(file_args) != 3:
usage()
write_patch(*file_args)
else:
usage()
if __name__ == "__main__":
Main()
|