1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
"""Upload the contents of your Downloads folder to Dropbox.
This is an example app for API v2.
"""
from __future__ import print_function
import argparse
import contextlib
import datetime
import os
import six
import sys
import time
import unicodedata
if sys.version.startswith('2'):
input = raw_input # noqa: E501,F821; pylint: disable=redefined-builtin,undefined-variable,useless-suppression
import dropbox
# OAuth2 access token. TODO: login etc.
TOKEN = ''
parser = argparse.ArgumentParser(description='Sync ~/Downloads to Dropbox')
parser.add_argument('folder', nargs='?', default='Downloads',
help='Folder name in your Dropbox')
parser.add_argument('rootdir', nargs='?', default='~/Downloads',
help='Local directory to upload')
parser.add_argument('--token', default=TOKEN,
help='Access token '
'(see https://www.dropbox.com/developers/apps)')
parser.add_argument('--yes', '-y', action='store_true',
help='Answer yes to all questions')
parser.add_argument('--no', '-n', action='store_true',
help='Answer no to all questions')
parser.add_argument('--default', '-d', action='store_true',
help='Take default answer on all questions')
def main():
"""Main program.
Parse command line, then iterate over files and directories under
rootdir and upload all files. Skips some temporary files and
directories, and avoids duplicate uploads by comparing size and
mtime with the server.
"""
args = parser.parse_args()
if sum([bool(b) for b in (args.yes, args.no, args.default)]) > 1:
print('At most one of --yes, --no, --default is allowed')
sys.exit(2)
if not args.token:
print('--token is mandatory')
sys.exit(2)
folder = args.folder
rootdir = os.path.expanduser(args.rootdir)
print('Dropbox folder name:', folder)
print('Local directory:', rootdir)
if not os.path.exists(rootdir):
print(rootdir, 'does not exist on your filesystem')
sys.exit(1)
elif not os.path.isdir(rootdir):
print(rootdir, 'is not a folder on your filesystem')
sys.exit(1)
dbx = dropbox.Dropbox(args.token)
for dn, dirs, files in os.walk(rootdir):
subfolder = dn[len(rootdir):].strip(os.path.sep)
listing = list_folder(dbx, folder, subfolder)
print('Descending into', subfolder, '...')
# First do all the files.
for name in files:
fullname = os.path.join(dn, name)
if not isinstance(name, six.text_type):
name = name.decode('utf-8')
nname = unicodedata.normalize('NFC', name)
if name.startswith('.'):
print('Skipping dot file:', name)
elif name.startswith('@') or name.endswith('~'):
print('Skipping temporary file:', name)
elif name.endswith('.pyc') or name.endswith('.pyo'):
print('Skipping generated file:', name)
elif nname in listing:
md = listing[nname]
mtime = os.path.getmtime(fullname)
mtime_dt = datetime.datetime(*time.gmtime(mtime)[:6])
size = os.path.getsize(fullname)
if (isinstance(md, dropbox.files.FileMetadata) and
mtime_dt == md.client_modified and size == md.size):
print(name, 'is already synced [stats match]')
else:
print(name, 'exists with different stats, downloading')
res = download(dbx, folder, subfolder, name)
with open(fullname) as f:
data = f.read()
if res == data:
print(name, 'is already synced [content match]')
else:
print(name, 'has changed since last sync')
if yesno('Refresh %s' % name, False, args):
upload(dbx, fullname, folder, subfolder, name,
overwrite=True)
elif yesno('Upload %s' % name, True, args):
upload(dbx, fullname, folder, subfolder, name)
# Then choose which subdirectories to traverse.
keep = []
for name in dirs:
if name.startswith('.'):
print('Skipping dot directory:', name)
elif name.startswith('@') or name.endswith('~'):
print('Skipping temporary directory:', name)
elif name == '__pycache__':
print('Skipping generated directory:', name)
elif yesno('Descend into %s' % name, True, args):
print('Keeping directory:', name)
keep.append(name)
else:
print('OK, skipping directory:', name)
dirs[:] = keep
dbx.close()
def list_folder(dbx, folder, subfolder):
"""List a folder.
Return a dict mapping unicode filenames to
FileMetadata|FolderMetadata entries.
"""
path = '/%s/%s' % (folder, subfolder.replace(os.path.sep, '/'))
while '//' in path:
path = path.replace('//', '/')
path = path.rstrip('/')
try:
with stopwatch('list_folder'):
res = dbx.files_list_folder(path)
except dropbox.exceptions.ApiError as err:
print('Folder listing failed for', path, '-- assumed empty:', err)
return {}
else:
rv = {}
for entry in res.entries:
rv[entry.name] = entry
return rv
def download(dbx, folder, subfolder, name):
"""Download a file.
Return the bytes of the file, or None if it doesn't exist.
"""
path = '/%s/%s/%s' % (folder, subfolder.replace(os.path.sep, '/'), name)
while '//' in path:
path = path.replace('//', '/')
with stopwatch('download'):
try:
md, res = dbx.files_download(path)
except dropbox.exceptions.HttpError as err:
print('*** HTTP error', err)
return None
data = res.content
print(len(data), 'bytes; md:', md)
return data
def upload(dbx, fullname, folder, subfolder, name, overwrite=False):
"""Upload a file.
Return the request response, or None in case of error.
"""
path = '/%s/%s/%s' % (folder, subfolder.replace(os.path.sep, '/'), name)
while '//' in path:
path = path.replace('//', '/')
mode = (dropbox.files.WriteMode.overwrite
if overwrite
else dropbox.files.WriteMode.add)
mtime = os.path.getmtime(fullname)
with open(fullname, 'rb') as f:
data = f.read()
with stopwatch('upload %d bytes' % len(data)):
try:
res = dbx.files_upload(
data, path, mode,
client_modified=datetime.datetime(*time.gmtime(mtime)[:6]),
mute=True)
except dropbox.exceptions.ApiError as err:
print('*** API error', err)
return None
print('uploaded as', res.name.encode('utf8'))
return res
def yesno(message, default, args):
"""Handy helper function to ask a yes/no question.
Command line arguments --yes or --no force the answer;
--default to force the default answer.
Otherwise a blank line returns the default, and answering
y/yes or n/no returns True or False.
Retry on unrecognized answer.
Special answers:
- q or quit exits the program
- p or pdb invokes the debugger
"""
if args.default:
print(message + '? [auto]', 'Y' if default else 'N')
return default
if args.yes:
print(message + '? [auto] YES')
return True
if args.no:
print(message + '? [auto] NO')
return False
if default:
message += '? [Y/n] '
else:
message += '? [N/y] '
while True:
answer = input(message).strip().lower()
if not answer:
return default
if answer in ('y', 'yes'):
return True
if answer in ('n', 'no'):
return False
if answer in ('q', 'quit'):
print('Exit')
raise SystemExit(0)
if answer in ('p', 'pdb'):
import pdb
pdb.set_trace()
print('Please answer YES or NO.')
@contextlib.contextmanager
def stopwatch(message):
"""Context manager to print how long a block of code took."""
t0 = time.time()
try:
yield
finally:
t1 = time.time()
print('Total elapsed time for %s: %.3f' % (message, t1 - t0))
if __name__ == '__main__':
main()
|