1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
#!/usr/bin/env python3
'''
clone_fs.py - this file is part of S3QL.
Clone an S3QL file system from one backend into another, without
recompressing or reencrypting.
Copyright © 2008 Nikolaus Rath <Nikolaus@rath.org>
This work can be distributed under the terms of the GNU GPLv3.
'''
import os
import sys
import tempfile
import time
from queue import Queue, Full as QueueFull
# We are running from the S3QL source directory, make sure
# that we use modules from this directory
basedir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '..'))
if (os.path.exists(os.path.join(basedir, 'setup.py')) and
os.path.exists(os.path.join(basedir, 'src', 's3ql', '__init__.py'))):
sys.path = [os.path.join(basedir, 'src')] + sys.path
from s3ql.logging import logging, setup_logging, QuietError
from s3ql.common import get_backend_factory, AsyncFn, handle_on_return
from s3ql.backends.common import DanglingStorageURLError
from s3ql import BUFSIZE
from s3ql.parse_args import ArgumentParser, storage_url_type
log = logging.getLogger(__name__)
def parse_args(args):
'''Parse command line'''
parser = ArgumentParser(
description='Clone an S3QL file system.')
parser.add_authfile()
parser.add_quiet()
parser.add_debug()
parser.add_backend_options()
parser.add_version()
parser.add_argument("src_storage_url", metavar='<source-storage-url>',
type=storage_url_type,
help='Storage URL of the source backend that contains the file system')
parser.add_argument("dst_storage_url", metavar='<destination-storage-url>',
type=storage_url_type,
help='Storage URL of the destination backend')
parser.add_argument("--threads", type=int, default=3,
help='Number of threads to use')
return parser.parse_args(args)
@handle_on_return
def copy_loop(queue, src_backend_factory, dst_backend_factory, on_return):
'''Copy keys arriving in *queue* from *src_backend* to *dst_backend*
Terminate when None is received.
'''
src_backend = on_return.enter_context(src_backend_factory())
dst_backend = on_return.enter_context(dst_backend_factory())
tmpfh = on_return.enter_context(tempfile.TemporaryFile())
while True:
key = queue.get()
if key is None:
break
log.debug('reading object %s', key)
def do_read(fh):
tmpfh.seek(0)
tmpfh.truncate()
while True:
buf = fh.read(BUFSIZE)
if not buf:
break
tmpfh.write(buf)
return fh.metadata
metadata = src_backend.perform_read(do_read, key)
log.debug('writing object %s', key)
def do_write(fh):
tmpfh.seek(0)
while True:
buf = tmpfh.read(BUFSIZE)
if not buf:
break
fh.write(buf)
dst_backend.perform_write(do_write, key, metadata)
def main(args=None):
options = parse_args(args)
setup_logging(options)
try:
options.storage_url = options.src_storage_url
src_backend_factory = get_backend_factory(options.src_storage_url,
options.backend_options,
options.authfile, raw=True)
options.storage_url = options.dst_storage_url
dst_backend_factory = get_backend_factory(options.dst_storage_url,
options.backend_options,
options.authfile, raw=True)
except DanglingStorageURLError as exc:
raise QuietError(str(exc)) from None
queue = Queue(maxsize=options.threads)
threads = []
for _ in range(options.threads):
t = AsyncFn(copy_loop, queue, src_backend_factory,
dst_backend_factory)
# Don't wait for worker threads, gives deadlock if main thread
# terminates with exception
t.daemon = True
t.start()
threads.append(t)
with src_backend_factory() as backend:
stamp1 = 0
for (i, key) in enumerate(backend):
stamp2 = time.time()
if stamp2 - stamp1 > 1:
stamp1 = stamp2
sys.stdout.write('\rCopied %d objects so far...' % i)
sys.stdout.flush()
# Terminate early if any thread failed with an exception
for t in threads:
if not t.is_alive():
t.join_and_raise()
# Avoid blocking if all threads terminated
while True:
try:
queue.put(key, timeout=1)
except QueueFull:
pass
else:
break
for t in threads:
if not t.is_alive():
t.join_and_raise()
sys.stdout.write('\n')
queue.maxsize += len(threads)
for t in threads:
queue.put(None)
for t in threads:
t.join_and_raise()
if __name__ == '__main__':
main(sys.argv[1:])
|