File: s4cmd.py

package info (click to toggle)
s4cmd 2.1.0%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye
  • size: 336 kB
  • sloc: python: 1,620; sh: 670; makefile: 20
file content (1948 lines) | stat: -rwxr-xr-x 70,553 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
#!/usr/bin/env python

#
# Copyright 2012-2018 BloomReach, Inc.
# Portions Copyright 2014 Databricks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Super S3 command line tool.
"""

import sys, os, re, optparse, multiprocessing, fnmatch, time, hashlib, errno, pytz
import logging, traceback, types, threading, random, socket, shlex, datetime, json
import urllib3

IS_PYTHON2 = sys.version_info[0] == 2

if IS_PYTHON2:
  from cStringIO import StringIO
  import Queue
  import ConfigParser
else:
  from io import BytesIO as StringIO
  import queue as Queue
  import configparser  as ConfigParser

  def cmp(a, b):
    return (a > b) - (a < b)

if sys.version_info < (2, 7):
  # Python < 2.7 doesn't have the cmp_to_key function.
  from utils import cmp_to_key
else:
  from functools import cmp_to_key

##
## Global constants
##

S4CMD_VERSION = "2.1.0"

PATH_SEP = '/'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S UTC'
TIMESTAMP_FORMAT = '%04d-%02d-%02d %02d:%02d'
SOCKET_TIMEOUT = 5 * 60 # in sec(s) (timeout if we don't receive any recv() callback)
socket.setdefaulttimeout(SOCKET_TIMEOUT)

# Global list for temp files.
TEMP_FILES = set()

# Environment variable names for S3 credentials.
S3_ACCESS_KEY_NAME = "S3_ACCESS_KEY"
S3_SECRET_KEY_NAME = "S3_SECRET_KEY"
S4CMD_ENV_KEY = "S4CMD_OPTS"


##
## Utility classes
##

class Failure(RuntimeError):
  '''Exception for runtime failures'''
  pass

class InvalidArgument(RuntimeError):
  '''Exception for invalid input parameters'''
  pass

class RetryFailure(Exception):
  '''Runtime failure that can be retried'''
  pass


class S4cmdLoggingClass:
  def __init__(self):
    self.log = logging.Logger("s4cmd")
    self.log.stream = sys.stderr
    self.log_handler = logging.StreamHandler(self.log.stream)
    self.log.addHandler(self.log_handler)


  def configure(self, opt):
    'Configure the logger based on command-line arguments'''

    self.log_handler.setFormatter(logging.Formatter('%(message)s', DATETIME_FORMAT))
    if opt.debug:
      self.log.verbosity = 3
      self.log_handler.setFormatter(logging.Formatter(
          '  (%(levelname).1s)%(filename)s:%(lineno)-4d %(message)s',
          DATETIME_FORMAT))
      self.log.setLevel(logging.DEBUG)
    elif opt.verbose:
      self.log.verbosity = 2
      self.log.setLevel(logging.INFO)
    else:
      self.log.verbosity = 1
      self.log.setLevel(logging.ERROR)


  def get_loggers(self):
    '''Return a list of the logger methods: (debug, info, warn, error)'''

    return self.log.debug, self.log.info, self.log.warn, self.log.error

s4cmd_logging = S4cmdLoggingClass()
debug, info, warn, error = s4cmd_logging.get_loggers()


def get_default_thread_count():
  return int(os.getenv('S4CMD_NUM_THREADS', multiprocessing.cpu_count() * 4))


def log_calls(func):
  '''Decorator to log function calls.'''
  def wrapper(*args, **kargs):
    callStr = "%s(%s)" % (func.__name__, ", ".join([repr(p) for p in args] + ["%s=%s" % (k, repr(v)) for (k, v) in list(kargs.items())]))
    debug(">> %s", callStr)
    ret = func(*args, **kargs)
    debug("<< %s: %s", callStr, repr(ret))
    return ret
  return wrapper

##
## Utility functions
##

def synchronized(func):
  '''Decorator to synchronize function.'''
  func.__lock__ = threading.Lock()
  def synced_func(*args, **kargs):
    with func.__lock__:
      return func(*args, **kargs)
  return synced_func

def clear_progress():
  '''Clear previous progress message, if any.'''
  progress('')

@synchronized
def progress(msg, *args):
  '''Show current progress message to stderr.
     This function will remember the previous message so that next time,
     it will clear the previous message before showing next one.
  '''
  # Don't show any progress if the output is directed to a file.
  if not (sys.stdout.isatty() and sys.stderr.isatty()):
    return

  text = (msg % args)
  if progress.prev_message:
    sys.stderr.write(' ' * len(progress.prev_message) + '\r')
  sys.stderr.write(text + '\r')
  progress.prev_message = text

progress.prev_message = None

@synchronized
def message(msg, *args):
  '''Program message output.'''
  clear_progress()
  text = (msg % args)
  sys.stdout.write(text + '\n')

def fail(message, exc_info=None, status=1, stacktrace=False):
  '''Utility function to handle runtime failures gracefully.
     Show concise information if possible, then terminate program.
  '''
  text = message
  if exc_info:
    text += str(exc_info)
  error(text)
  if stacktrace:
    error(traceback.format_exc())
  clean_tempfiles()
  if __name__ == '__main__':
    sys.exit(status)
  else:
    raise RuntimeError(status)

@synchronized
def tempfile_get(target):
  '''Get a temp filename for atomic download.'''
  fn = '%s-%s.tmp' % (target, ''.join(random.Random().sample("0123456789abcdefghijklmnopqrstuvwxyz", 15)))
  TEMP_FILES.add(fn)
  return fn

@synchronized
def tempfile_set(tempfile, target):
  '''Atomically rename and clean tempfile'''
  if target:
    os.rename(tempfile, target)
  else:
    os.unlink(tempfile)

  if target in TEMP_FILES:
    TEMP_FILES.remove(tempfile)

def clean_tempfiles():
  '''Clean up temp files'''
  for fn in TEMP_FILES:
    if os.path.exists(fn):
      os.unlink(fn)

class S3URL:
  '''Simple wrapper for S3 URL.
     This class parses a S3 URL and provides accessors to each component.
  '''
  S3URL_PATTERN = re.compile(r'(s3[n]?)://([^/]+)[/]?(.*)')

  def __init__(self, uri):
    '''Initialization, parse S3 URL'''
    try:
      self.proto, self.bucket, self.path = S3URL.S3URL_PATTERN.match(uri).groups()
      self.proto = 's3' # normalize s3n => s3
    except:
      raise InvalidArgument('Invalid S3 URI: %s' % uri)

  def __str__(self):
    '''Return the original S3 URL'''
    return S3URL.combine(self.proto, self.bucket, self.path)

  def get_fixed_path(self):
    '''Get the fixed part of the path without wildcard'''
    pi = self.path.split(PATH_SEP)
    fi = []
    for p in pi:
      if '*' in p or '?' in p:
        break
      fi.append(p)
    return PATH_SEP.join(fi)

  @staticmethod
  def combine(proto, bucket, path):
    '''Combine each component and general a S3 url string, no path normalization
       here. The path should not start with slash.
    '''
    return '%s://%s/%s' % (proto, bucket, path)

  @staticmethod
  def is_valid(uri):
    '''Check if given uri is a valid S3 URL'''
    return S3URL.S3URL_PATTERN.match(uri) != None

class BotoClient(object):
  '''This is a bridge between s4cmd and boto3 library. All S3 method calls should go through this class.
     The white list ALLOWED_CLIENT_METHODS lists those methods that are allowed. Also, EXTRA_CLIENT_PARAMS
     is the list of S3 parameters that we can take from command-line argument and pass through to the API.
  '''

  # Encapsulate boto3 interface intercept all API calls.
  boto3 = __import__('boto3') # version >= 1.3.1
  botocore = __import__('botocore')

  # Exported exceptions.
  BotoError = boto3.exceptions.Boto3Error
  ClientError = botocore.exceptions.ClientError
  NoCredentialsError = botocore.exceptions.NoCredentialsError

  # Exceptions that retries may work. May change in the future.
  S3RetryableErrors = (
    socket.timeout,
    socket.error if IS_PYTHON2 else ConnectionError,
    urllib3.exceptions.ReadTimeoutError,
    botocore.exceptions.IncompleteReadError
  )

  # List of API functions we use in s4cmd.
  ALLOWED_CLIENT_METHODS = [
    'list_buckets',
    'get_paginator',
    'head_object',
    'put_object',
    'create_bucket',
    'create_multipart_upload',
    'upload_part',
    'complete_multipart_upload',
    'abort_multipart_upload',
    'get_object',
    'copy_object',
    'delete_object',
    'delete_objects',
    'upload_part_copy'
  ]

  # List of parameters grabbed from http://boto3.readthedocs.io/en/latest/reference/services/s3.html
  # Pass those parameters directly to boto3 low level API. Most of the parameters are not tested.
  EXTRA_CLIENT_PARAMS = [
      ("ACL", "string",
       "The canned ACL to apply to the object."),
      ("CacheControl", "string",
       "Specifies caching behavior along the request/reply chain."),
      ("ContentDisposition", "string",
       "Specifies presentational information for the object."),
      ("ContentEncoding", "string",
       "Specifies what content encodings have been applied to the object and thus what decoding mechanisms must be applied to obtain the media-type referenced by the Content-Type header field."),
      ("ContentLanguage", "string",
       "The language the content is in."),
      ("ContentMD5", "string",
       "The base64-encoded 128-bit MD5 digest of the part data."),
      ("ContentType", "string",
       "A standard MIME type describing the format of the object data."),
      ("CopySourceIfMatch", "string",
       "Copies the object if its entity tag (ETag) matches the specified tag."),
      ("CopySourceIfModifiedSince", "datetime",
       "Copies the object if it has been modified since the specified time."),
      ("CopySourceIfNoneMatch", "string",
       "Copies the object if its entity tag (ETag) is different than the specified ETag."),
      ("CopySourceIfUnmodifiedSince", "datetime",
       "Copies the object if it hasn't been modified since the specified time."),
      ("CopySourceRange", "string",
       "The range of bytes to copy from the source object. The range value must use the form bytes=first-last, where the first and last are the zero-based byte offsets to copy. For example, bytes=0-9 indicates that you want to copy the first ten bytes of the source. You can copy a range only if the source object is greater than 5 GB."),
      ("CopySourceSSECustomerAlgorithm", "string",
       "Specifies the algorithm to use when decrypting the source object (e.g., AES256)."),
      ("CopySourceSSECustomerKeyMD5", "string",
       "Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321. Amazon S3 uses this header for a message integrity check to ensure the encryption key was transmitted without error. Please note that this parameter is automatically populated if it is not provided. Including this parameter is not required"),
      ("CopySourceSSECustomerKey", "string",
       "Specifies the customer-provided encryption key for Amazon S3 to use to decrypt the source object. The encryption key provided in this header must be one that was used when the source object was created."),
      ("ETag", "string",
       "Entity tag returned when the part was uploaded."),
      ("Expires", "datetime",
       "The date and time at which the object is no longer cacheable."),
      ("GrantFullControl", "string",
       "Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object."),
      ("GrantReadACP", "string",
       "Allows grantee to read the object ACL."),
      ("GrantRead", "string",
       "Allows grantee to read the object data and its metadata."),
      ("GrantWriteACP", "string",
       "Allows grantee to write the ACL for the applicable object."),
      ("IfMatch", "string",
       "Return the object only if its entity tag (ETag) is the same as the one specified, otherwise return a 412 (precondition failed)."),
      ("IfModifiedSince", "datetime",
       "Return the object only if it has been modified since the specified time, otherwise return a 304 (not modified)."),
      ("IfNoneMatch", "string",
       "Return the object only if its entity tag (ETag) is different from the one specified, otherwise return a 304 (not modified)."),
      ("IfUnmodifiedSince", "datetime",
       "Return the object only if it has not been modified since the specified time, otherwise return a 412 (precondition failed)."),
      ("Metadata", "dict",
       "A map (in json string) of metadata to store with the object in S3"),
      ("MetadataDirective", "string",
       "Specifies whether the metadata is copied from the source object or replaced with metadata provided in the request."),
      ("MFA", "string",
       "The concatenation of the authentication device's serial number, a space, and the value that is displayed on your authentication device."),
      ("RequestPayer", "string",
       "Confirms that the requester knows that she or he will be charged for the request. Bucket owners need not specify this parameter in their requests. Documentation on downloading objects from requester pays buckets can be found at http://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectsinRequesterPaysBuckets.html"),
      ("ServerSideEncryption", "string",
       "The Server-side encryption algorithm used when storing this object in S3 (e.g., AES256, aws:kms)."),
      ("SSECustomerAlgorithm", "string",
       "Specifies the algorithm to use to when encrypting the object (e.g., AES256)."),
      ("SSECustomerKeyMD5", "string",
       "Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321. Amazon S3 uses this header for a message integrity check to ensure the encryption key was transmitted without error. Please note that this parameter is automatically populated if it is not provided. Including this parameter is not required"),
      ("SSECustomerKey", "string",
       "Specifies the customer-provided encryption key for Amazon S3 to use in encrypting data. This value is used to store the object and then it is discarded; Amazon does not store the encryption key. The key must be appropriate for use with the algorithm specified in the x-amz-server-side-encryption-customer-algorithm header."),
      ("SSEKMSKeyId", "string",
       "Specifies the AWS KMS key ID to use for object encryption. All GET and PUT requests for an object protected by AWS KMS will fail if not made via SSL or using SigV4. Documentation on configuring any of the officially supported AWS SDKs and CLI can be found at http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version"),
      ("StorageClass", "string",
       "The type of storage to use for the object. Defaults to 'STANDARD'."),
      ("VersionId", "string",
       "VersionId used to reference a specific version of the object."),
      ("WebsiteRedirectLocation", "string",
       "If the bucket is configured as a website, redirects requests for this object to another object in the same bucket or to an external URL. Amazon S3 stores the value of this header in the object metadata."),
  ]

  def __init__(self, opt, aws_access_key_id=None, aws_secret_access_key=None):
    '''Initialize boto3 API bridge class. Calculate and cache all legal parameters
       for each method we are going to call.
    '''
    self.opt = opt
    if (aws_access_key_id is not None) and (aws_secret_access_key is not None):
      self.client = self.boto3.client('s3',
                                      aws_access_key_id=aws_access_key_id,
                                      aws_secret_access_key=aws_secret_access_key,
                                      endpoint_url=opt.endpoint_url)
    else:
      self.client = self.boto3.client('s3', endpoint_url=opt.endpoint_url)

    # Cache the result so we don't have to recalculate.
    self.legal_params = {}
    for method in BotoClient.ALLOWED_CLIENT_METHODS:
      self.legal_params[method] = self.get_legal_params(method)

  def __getattribute__(self, method):
    '''Intercept boto3 API call to inject our extra options.'''

    if method in BotoClient.ALLOWED_CLIENT_METHODS:

      def wrapped_method(*args, **kargs):
        merged_kargs = self.merge_opt_params(method, kargs)
        callStr = "%s(%s)" % ("S3APICALL " + method, ", ".join([repr(p) for p in args] + ["%s=%s" % (k, repr(v)) for (k, v) in list(kargs.items())]))
        debug(">> %s", callStr)
        ret = getattr(self.client, method)(*args, **merged_kargs)
        debug("<< %s: %s", callStr, repr(ret))
        return ret

      return wrapped_method

    return super(BotoClient, self).__getattribute__(method)

  def get_legal_params(self, method):
    '''Given a API name, list all legal parameters using boto3 service model.'''
    if method not in self.client.meta.method_to_api_mapping:
      # Injected methods. Ignore.
      return []
    api = self.client.meta.method_to_api_mapping[method]
    shape = self.client.meta.service_model.operation_model(api).input_shape
    if shape is None:
      # No params needed for this API.
      return []
    return shape.members.keys()

  def merge_opt_params(self, method, kargs):
    '''Combine existing parameters with extra options supplied from command line
       options. Carefully merge special type of parameter if needed.
    '''
    for key in self.legal_params[method]:
      if not hasattr(self.opt, key) or getattr(self.opt, key) is None:
        continue
      if key in kargs and type(kargs[key]) == dict:
        assert(type(getattr(self.opt, key)) == dict)
        # Merge two dictionaries.
        for k, v in getattr(self.opt, key).iteritems():
          kargs[key][k] = v
      else:
        # Overwrite values.
        kargs[key] = getattr(self.opt, key)

    return kargs

  @staticmethod
  def add_options(parser):
    '''Add the whole list of API parameters into optparse.'''
    for param, param_type, param_doc in BotoClient.EXTRA_CLIENT_PARAMS:
      parser.add_option('--API-' + param, help=param_doc, type=param_type, dest=param)

  def close(self):
    '''Close this client.'''
    self.client = None

class TaskQueue(Queue.Queue):
  '''Wrapper class to Queue.
     Since we need to ensure that main thread is not blocked by child threads
     and cannot be wake up by Ctrl-C interrupt, we have to override join()
     method.
  '''
  def __init__(self):
    Queue.Queue.__init__(self)
    self.exc_info = None

  def join(self):
    '''Override original join() with a timeout and handle keyboard interrupt.'''
    self.all_tasks_done.acquire()
    try:
      while self.unfinished_tasks:
        self.all_tasks_done.wait(1000)

        # Child thread has exceptions, fail main thread too.
        if self.exc_info:
          fail('[Thread Failure] ', exc_info=self.exc_info)
    except KeyboardInterrupt:
      raise Failure('Interrupted by user')
    finally:
      self.all_tasks_done.release()

  def terminate(self, exc_info=None):
    '''Terminate all threads by deleting the queue and forcing the child threads
       to quit.
    '''
    if exc_info:
      self.exc_info = exc_info
    try:
      while self.get_nowait():
        self.task_done()
    except Queue.Empty:
      pass

class ThreadPool(object):
  '''Utility class for thread pool.
     This class needs to work with a utility class, which is derived from Worker.
  '''

  class Worker(threading.Thread):
    '''Utility thread worker class.
       This class handles all items in task queue and execute them. It also
       handles runtime errors gracefully, and provides automatic retry.
    '''
    def __init__(self, pool):
      '''Thread worker initalization.
         Setup values and start threads right away.
      '''
      threading.Thread.__init__(self)
      self.pool = pool
      self.opt = pool.opt
      self.daemon = True
      self.start()

    def run(self):
      '''Main thread worker execution.
         This function extract items from task queue and execute them accordingly.
         It will retry tasks when encounter exceptions by putting the same item
         back to the work queue.
      '''
      while True:
        item = self.pool.tasks.get()
        if not item:
          break

        try:
          func_name, retry, args, kargs = item
          self.__class__.__dict__[func_name](self, *args, **kargs)
        except InvalidArgument as e:
          self.pool.tasks.terminate(e)
          fail('[Invalid Argument] ', exc_info=e)
        except Failure as e:
          self.pool.tasks.terminate(e)
          fail('[Runtime Failure] ', exc_info=e)
        except OSError as e:
          self.pool.tasks.terminate(e)
          fail('[OSError] %d: %s' % (e.errno, e.strerror))
        except BotoClient.S3RetryableErrors as e:
          if retry >= self.opt.retry:
            self.pool.tasks.terminate(e)
            fail('[Runtime Exception] ', exc_info=e, stacktrace=True)
          else:
            # Show content of exceptions.
            error(e)

          time.sleep(self.opt.retry_delay)
          self.pool.tasks.put((func_name, retry + 1, args, kargs))
        except Exception as e:
          self.pool.tasks.terminate(e)
          fail('[Exception] ', exc_info=e)
        finally:
          self.pool.processed()
          self.pool.tasks.task_done()

  def __init__(self, thread_class, opt):
    '''Constructor of ThreadPool.
       Create workers and pool will automatically inherit all methods from
       thread_class by redirecting calls through __getattribute__().
    '''
    self.opt = opt
    self.tasks = TaskQueue()
    self.processed_tasks = 0
    self.thread_class = thread_class
    self.workers = []
    for i in range(opt.num_threads):
      self.workers.append(thread_class(self))

  def __enter__(self):
    '''Utility function for with statement'''
    return self

  def __exit__(self, exc_type, exc_value, traceback):
    '''Utility function for with statement, wait for completion'''
    self.join()
    return isinstance(exc_value, TypeError)

  def __getattribute__(self, name):
    '''Special attribute accessor to add tasks into task queue.
       Here if we found a function not in ThreadPool, we will try
       to see if we have a function in the utility class. If so, we
       add the function call into task queue.
    '''
    try:
      attr = super(ThreadPool, self).__getattribute__(name)
    except AttributeError as e:
      if name in self.thread_class.__dict__:
        # Here we masquerade the original function with add_task(). So the
        # function call will be put into task queue.
        def deferred_task(*args, **kargs):
          self.add_task(name, *args, **kargs)
        attr = deferred_task
      else:
        raise AttributeError('Unable to resolve %s' % name)
    return attr

  def add_task(self, func_name, *args, **kargs):
    '''Utility function to add a single task into task queue'''
    self.tasks.put((func_name, 0, args, kargs))

  def join(self):
    '''Utility function to wait all tasks to complete'''
    self.tasks.join()

    # Force each thread to break loop.
    for worker in self.workers:
      self.tasks.put(None)

    # Wait for all thread to terminate.
    for worker in self.workers:
      worker.join()
      worker.s3 = None

  @synchronized
  def processed(self):
    '''Increase the processed task counter and show progress message'''
    self.processed_tasks += 1
    qsize = self.tasks.qsize()
    if qsize > 0:
      progress('[%d task(s) completed, %d remaining, %d thread(s)]', self.processed_tasks, qsize, len(self.workers))
    else:
      progress('[%d task(s) completed, %d thread(s)]', self.processed_tasks, len(self.workers))

class S3Handler(object):
  '''Core S3 class.
     This class provide the functions for all operations. It will start thread
     pool to execute tasks generated by each operation. See ThreadUtil for
     more details about the tasks.
  '''

  S3_KEYS = None

  @staticmethod
  def s3_keys_from_env():
    '''Retrieve S3 access keys from the environment, or None if not present.'''
    env = os.environ
    if S3_ACCESS_KEY_NAME in env and S3_SECRET_KEY_NAME in env:
      keys = (env[S3_ACCESS_KEY_NAME], env[S3_SECRET_KEY_NAME])
      debug("read S3 keys from environment")
      return keys
    else:
      return None

  @staticmethod
  def s3_keys_from_cmdline(opt):
    '''Retrieve S3 access keys from the command line, or None if not present.'''
    if opt.access_key != None and opt.secret_key != None:
      keys = (opt.access_key, opt.secret_key)
      debug("read S3 keys from commandline")
      return keys
    else:
      return None

  @staticmethod
  def s3_keys_from_s3cfg(opt):
    '''Retrieve S3 access key settings from s3cmd's config file, if present; otherwise return None.'''
    try:
      if opt.s3cfg != None:
        s3cfg_path = "%s" % opt.s3cfg
      else:
        s3cfg_path = "%s/.s3cfg" % os.environ["HOME"]
      if not os.path.exists(s3cfg_path):
        return None
      config = ConfigParser.ConfigParser()
      config.read(s3cfg_path)
      keys = config.get("default", "access_key"), config.get("default", "secret_key")
      debug("read S3 keys from $HOME/.s3cfg file")
      return keys
    except Exception as e:
      info("could not read S3 keys from %s file; skipping (%s)", s3cfg_path, e)
      return None

  @staticmethod
  def init_s3_keys(opt):
    '''Initialize s3 access keys from environment variable or s3cfg config file.'''
    S3Handler.S3_KEYS = S3Handler.s3_keys_from_cmdline(opt) or S3Handler.s3_keys_from_env() \
                        or S3Handler.s3_keys_from_s3cfg(opt)

  def __init__(self, opt):
    '''Constructor, connect to S3 store'''
    self.s3 = None
    self.opt = opt
    self.connect()

  def __del__(self):
    '''Destructor, stop s3 connection'''
    self.s3 = None

  def connect(self):
    '''Connect to S3 storage'''
    try:
      if S3Handler.S3_KEYS:
        self.s3 = BotoClient(self.opt, S3Handler.S3_KEYS[0], S3Handler.S3_KEYS[1])
      else:
        self.s3 = BotoClient(self.opt)
    except Exception as e:
      raise RetryFailure('Unable to connect to s3: %s' % e)

  @log_calls
  def list_buckets(self):
    '''List all buckets'''
    result = []
    for bucket in self.s3.list_buckets().get('Buckets') or []:
      result.append({
          'name': S3URL.combine('s3', bucket['Name'], ''),
          'is_dir': True,
          'size': 0,
          'last_modified': bucket['CreationDate']
        })
    return result

  @log_calls
  def s3walk(self, basedir, show_dir=None):
    '''Walk through a S3 directory. This function initiate a walk with a basedir.
       It also supports multiple wildcards.
    '''
    # Provide the default value from command line if no override.
    if not show_dir:
      show_dir = self.opt.show_dir

    # trailing slash normalization, this is for the reason that we want
    # ls 's3://foo/bar/' has the same result as 's3://foo/bar'. Since we
    # call partial_match() to check wildcards, we need to ensure the number
    # of slashes stays the same when we do this.
    if basedir[-1] == PATH_SEP:
      basedir = basedir[0:-1]

    s3url = S3URL(basedir)
    result = []

    pool = ThreadPool(ThreadUtil, self.opt)
    pool.s3walk(s3url, s3url.get_fixed_path(), s3url.path, result)
    pool.join()

    # automatic directory detection
    if not show_dir and len(result) == 1 and result[0]['is_dir']:
      path = result[0]['name']
      s3url = S3URL(path)
      result = []
      pool = ThreadPool(ThreadUtil, self.opt)
      pool.s3walk(s3url, s3url.get_fixed_path(), s3url.path, result)
      pool.join()

    def compare(x, y):
      '''Comparator for ls output'''
      result = -cmp(x['is_dir'], y['is_dir'])
      if result != 0:
        return result
      return cmp(x['name'], y['name'])
    return sorted(result, key=cmp_to_key(compare))

  @log_calls
  def local_walk(self, basedir):
    '''Walk through local directories from root basedir'''
    result = []

    for root, dirs, files in os.walk(basedir):
      for f in files:
        result.append(os.path.join(root, f))
    return result

  @log_calls
  def get_basename(self, path):
    '''Unix style basename.
       This fuction will return 'bar' for '/foo/bar/' instead of empty string.
       It is used to normalize the input trailing slash.
    '''
    if path[-1] == PATH_SEP:
      path = path[0:-1]
    return os.path.basename(path)

  def source_expand(self, source):
    '''Expand the wildcards for an S3 path. This emulates the shall expansion
       for wildcards if the input is local path.
    '''
    result = []

    if not isinstance(source, list):
      source = [source]

    for src in source:
      # XXX Hacky: We need to disable recursive when we expand the input
      #            parameters, need to pass this as an override parameter if
      #            provided.
      tmp = self.opt.recursive
      self.opt.recursive = False
      result += [f['name'] for f in self.s3walk(src, True)]
      self.opt.recursive = tmp

    if (len(result) == 0) and (not self.opt.ignore_empty_source):
      fail("[Runtime Failure] Source doesn't exist.")

    return result

  @log_calls
  def put_single_file(self, pool, source, target):
    '''Upload a single file or a directory by adding a task into queue'''
    if os.path.isdir(source):
      if self.opt.recursive:
        for f in (f for f in self.local_walk(source) if not os.path.isdir(f)):
          target_url = S3URL(target)
          # deal with ./ or ../ here by normalizing the path.
          joined_path = os.path.normpath(os.path.join(target_url.path, os.path.relpath(f, source)))
          pool.upload(f, S3URL.combine('s3', target_url.bucket, joined_path))
      else:
        message('omitting directory "%s".' % source)
    else:
      pool.upload(source, target)

  @log_calls
  def put_files(self, source, target):
    '''Upload files to S3.
       This function can handle multiple file upload if source is a list.
       Also, it works for recursive mode which copy all files and keep the
       directory structure under the given source directory.
    '''
    pool = ThreadPool(ThreadUtil, self.opt)
    if not isinstance(source, list):
      source = [source]

    if target[-1] == PATH_SEP:
      for src in source:
        self.put_single_file(pool, src, os.path.join(target, self.get_basename(src)))
    else:
      if len(source) == 1:
        self.put_single_file(pool, source[0], target)
      else:
        raise Failure('Target "%s" is not a directory (with a trailing slash).' % target)

    pool.join()

  @log_calls
  def create_bucket(self, source):
    '''Use the create_bucket API to create a new bucket'''
    s3url = S3URL(source)

    message('Creating %s', source)
    if not self.opt.dry_run:
      resp = self.s3.create_bucket(Bucket=s3url.bucket)
      if resp['ResponseMetadata']["HTTPStatusCode"] == 200:
        message('Done.')
      else:
        raise Failure('Unable to create bucket %s' % source)


  @log_calls
  def update_privilege(self, obj, target):
    '''Get privileges from metadata of the source in s3, and apply them to target'''
    if 'privilege' in obj['Metadata']:
      os.chmod(target, int(obj['Metadata']['privilege'], 8))

  @log_calls
  def print_files(self, source):
    '''Print out a series of files'''
    sources = self.source_expand(source)

    for source in sources:
      s3url = S3URL(source)
      response = self.s3.get_object(Bucket=s3url.bucket, Key=s3url.path)
      message('%s', response['Body'].read())

  @log_calls
  def get_single_file(self, pool, source, target):
    '''Download a single file or a directory by adding a task into queue'''
    if source[-1] == PATH_SEP:
      if self.opt.recursive:
        basepath = S3URL(source).path
        for f in (f for f in self.s3walk(source) if not f['is_dir']):
          pool.download(f['name'], os.path.join(target, os.path.relpath(S3URL(f['name']).path, basepath)))
      else:
        message('omitting directory "%s".' % source)
    else:
      pool.download(source, target)

  @log_calls
  def get_files(self, source, target):
    '''Download files.
       This function can handle multiple files if source S3 URL has wildcard
       characters. It also handles recursive mode by download all files and
       keep the directory structure.
    '''
    pool = ThreadPool(ThreadUtil, self.opt)
    source = self.source_expand(source)

    if os.path.isdir(target):
      for src in source:
        self.get_single_file(pool, src, os.path.join(target, self.get_basename(S3URL(src).path)))
    else:
      if len(source) > 1:
        raise Failure('Target "%s" is not a directory.' % target)
        # Get file if it exists on s3 otherwise do nothing
      elif len(source) == 1:
        self.get_single_file(pool, source[0], target)
      else:
        #Source expand may return empty list only if ignore-empty-source is set to true
        pass

    pool.join()

  @log_calls
  def delete_removed_files(self, source, target):
    '''Remove remote files that are not present in the local source.
       (Obsolete) It is used for old sync command now.
    '''
    message("Deleting files found in %s and not in %s", source, target)
    if os.path.isdir(source):
      unecessary = []
      basepath = S3URL(target).path
      for f in [f for f in self.s3walk(target) if not f['is_dir']]:
        local_name = os.path.join(source, os.path.relpath(S3URL(f['name']).path, basepath))
        if not os.path.isfile(local_name):
          message("%s not found locally, adding to delete queue", local_name)
          unecessary.append(f['name'])
      if len(unecessary) > 0:
        pool = ThreadPool(ThreadUtil, self.opt)
        for del_file in unecessary:
          pool.delete(del_file)
        pool.join()
    else:
      raise Failure('Source "%s" is not a directory.' % target)

  @log_calls
  def cp_single_file(self, pool, source, target, delete_source):
    '''Copy a single file or a directory by adding a task into queue'''
    if source[-1] == PATH_SEP:
      if self.opt.recursive:
        basepath = S3URL(source).path
        for f in (f for f in self.s3walk(source) if not f['is_dir']):
          pool.copy(f['name'], os.path.join(target, os.path.relpath(S3URL(f['name']).path, basepath)), delete_source=delete_source)
      else:
        message('omitting directory "%s".' % source)
    else:
      pool.copy(source, target, delete_source=delete_source)

  @log_calls
  def cp_files(self, source, target, delete_source=False):
    '''Copy files
       This function can handle multiple files if source S3 URL has wildcard
       characters. It also handles recursive mode by copying all files and
       keep the directory structure.
    '''
    pool = ThreadPool(ThreadUtil, self.opt)
    source = self.source_expand(source)

    if target[-1] == PATH_SEP:
      for src in source:
        self.cp_single_file(pool, src, os.path.join(target, self.get_basename(S3URL(src).path)), delete_source)
    else:
      if len(source) > 1:
        raise Failure('Target "%s" is not a directory (with a trailing slash).' % target)
        # Copy file if it exists otherwise do nothing
      elif len(source) == 1:
        self.cp_single_file(pool, source[0], target, delete_source)
      else:
        # Source expand may return empty list only if ignore-empty-source is set to true
        pass

    pool.join()

  @log_calls
  def del_files(self, source):
    '''Delete files on S3'''
    src_files = []
    for obj in self.s3walk(source):
      if not obj['is_dir']: # ignore directories
        src_files.append(obj['name'])

    pool = ThreadPool(ThreadUtil, self.opt)
    pool.batch_delete(src_files)
    pool.join()

  @log_calls
  def relative_dir_walk(self, dir):
    '''Generic version of directory walk. Return file list without base path
       for comparison.
    '''
    result = []

    if S3URL.is_valid(dir):
      basepath = S3URL(dir).path
      for f in (f for f in self.s3walk(dir) if not f['is_dir']):
        result.append(os.path.relpath(S3URL(f['name']).path, basepath))
    else:
      for f in (f for f in self.local_walk(dir) if not os.path.isdir(f)):
        result.append(os.path.relpath(f, dir))

    return result

  @log_calls
  def dsync_files(self, source, target):
    '''Sync directory to directory.'''
    src_s3_url = S3URL.is_valid(source)
    dst_s3_url = S3URL.is_valid(target)

    source_list = self.relative_dir_walk(source)
    if len(source_list) == 0 or '.' in source_list:
      raise Failure('Sync command need to sync directory to directory.')

    sync_list = [(os.path.join(source, f), os.path.join(target, f)) for f in source_list]

    pool = ThreadPool(ThreadUtil, self.opt)
    if src_s3_url and not dst_s3_url:
      for src, dest in sync_list:
        pool.download(src, dest)
    elif not src_s3_url and dst_s3_url:
      for src, dest in sync_list:
        pool.upload(src, dest)
    elif src_s3_url and dst_s3_url:
      for src, dest in sync_list:
        pool.copy(src, dest)
    else:
      raise InvalidArgument('Cannot sync two local directories.')
    pool.join()

    if self.opt.delete_removed:
      target_list = self.relative_dir_walk(target)
      remove_list = [os.path.join(target, f) for f in (set(target_list) - set(source_list))]

      if S3URL.is_valid(target):
        pool = ThreadPool(ThreadUtil, self.opt)
        pool.batch_delete(remove_list)
        pool.join()
      else:
        for f in remove_list:
          try:
            os.unlink(f)
            message('Delete %s', f)
          except:
            pass

  @log_calls
  def sync_files(self, source, target):
    '''Sync files to S3. Does implement deletions if syncing TO s3.
       Currently identical to get/put -r -f --sync-check with exception of deletions.
    '''
    src_s3_url = S3URL.is_valid(source)
    dst_s3_url = S3URL.is_valid(target)

    if src_s3_url and not dst_s3_url:
      self.get_files(source, target)
    elif not src_s3_url and dst_s3_url:
      self.put_files(source, target)
      if self.opt.delete_removed:
        self.delete_removed_files(source, target)
    elif src_s3_url and dst_s3_url:
      self.cp_files(source, target)
    else:
      raise InvalidArgument('No S3 URI provided')

  @log_calls
  def size(self, source):
    '''Get the size component of the given s3url. If it is a
       directory, combine the sizes of all the files under
       that directory. Subdirectories will not be counted unless
       --recursive option is set.
    '''
    result = []
    for src in self.source_expand(source):
      size = 0
      for f in self.s3walk(src):
        size += f['size']
      result.append((src, size))

    return result

class LocalMD5Cache(object):
  '''Stub class to provide lazy evaluation MD5.'''

  def __init__(self, filename):
    '''Initialize md5 cache object.'''
    self.filename = filename
    self.md5 = None

  def file_hash(self, filename, block_size=2**20):
    '''Calculate MD5 hash code for a local file'''
    m = hashlib.md5()
    with open(filename, 'rb') as f:
      while True:
        data = f.read(block_size)
        if not data:
          break
        m.update(data)
    return m.hexdigest()

  def get_md5(self):
    '''Get or calculate MD5 value of the local file.'''
    if self.md5 is None:
      self.md5 = self.file_hash(self.filename)
    return self.md5

class ThreadUtil(S3Handler, ThreadPool.Worker):
  '''Thread workers for S3 operations.
     This class contains all thread workers for S3 operations.

     1) Expand source into [source] list if it contains wildcard characters '*' or '?'.
        This is done by shell, but we need to do this ourselves for S3 path.
        Basically we see [source] as the first-class source list.

     2) Run the following algorithm:
        if target is directory? (S3 path uses trailing slash to determine this)
          for src in source:
            copy src to target/src.basename
        else
          if source has only one element?
            copy src to target
          else
            error "target should be a directory"!

     3) Copy operations should work for both single file and directory:
        def copy(src, target)
          if src is a directory?
            copy the whole directory recursively to target
          else
            copy the file src to target
  '''

  def __init__(self, pool):
    '''Constructor'''
    S3Handler.__init__(self, pool.opt)
    ThreadPool.Worker.__init__(self, pool)

  @log_calls
  def mkdirs(self, target):
    '''Ensure all directories are created for a given target file.'''
    path = os.path.dirname(target)
    if path and path != PATH_SEP and not os.path.isdir(path):
      # Multi-threading means there will be intervleaved execution
      # between the check and creation of the directory.
      try:
        os.makedirs(path)
      except OSError as ose:
        if ose.errno != errno.EEXIST:
          raise Failure('Unable to create directory (%s)' % (path,))

  @log_calls
  def sync_check(self, md5cache, remoteKey):
    '''Check MD5 for a local file and a remote file.
       Return True if they have the same md5 hash, otherwise False.
    '''
    if not remoteKey:
      return False
    if not os.path.exists(md5cache.filename):
      return False
    localmd5 = md5cache.get_md5()

    # check multiple md5 locations
    return ('ETag' in remoteKey and remoteKey['ETag'] == '"%s"' % localmd5) or \
           ('md5' in remoteKey and remoteKey['md5'] == localmd5) or \
           ('md5' in remoteKey['Metadata'] and remoteKey['Metadata']['md5'] == localmd5)

  @log_calls
  def partial_match(self, path, filter_path):
    '''Partially match a path and a filter_path with wildcards.
       This function will return True if this path partially match a filter path.
       This is used for walking through directories with multiple level wildcard.
    '''
    if not path or not filter_path:
      return True

    # trailing slash normalization
    if path[-1] == PATH_SEP:
      path = path[0:-1]
    if filter_path[-1] == PATH_SEP:
      filter_path += '*'

    pi = path.split(PATH_SEP)
    fi = filter_path.split(PATH_SEP)

    # Here, if we are in recursive mode, we allow the pi to be longer than fi.
    # Otherwise, length of pi should be equal or less than the lenght of fi.
    min_len = min(len(pi), len(fi))
    matched = fnmatch.fnmatch(PATH_SEP.join(pi[0:min_len]), PATH_SEP.join(fi[0:min_len]))
    return matched and (self.opt.recursive or len(pi) <= len(fi))

  @log_calls
  def s3walk(self, s3url, s3dir, filter_path, result):
    '''Thread worker for s3walk.
       Recursively walk into all subdirectories if they still match the filter
       path partially.
    '''

    paginator = self.s3.get_paginator('list_objects')
    filter_path_level = filter_path.count(PATH_SEP)

    for page in paginator.paginate(Bucket=s3url.bucket, Prefix=s3dir, Delimiter=PATH_SEP, PaginationConfig={'PageSize': 1000}):
      # Get subdirectories first.
      for obj in page.get('CommonPrefixes') or []:
        obj_name = obj['Prefix']

        if not self.partial_match(obj_name, filter_path):
          continue

        if self.opt.recursive or (obj_name.count(PATH_SEP) != filter_path_level + 1):
          self.pool.s3walk(s3url, obj_name, filter_path, result)
        else:
          self.conditional(result, {
            'name': S3URL.combine(s3url.proto, s3url.bucket, obj_name),
            'is_dir': True,
            'size': 0,
            'last_modified': None
          })

      # Then get all items in this folder.
      for obj in page.get('Contents') or []:
        obj_name = obj['Key']
        if not self.partial_match(obj_name, filter_path):
          continue

        if self.opt.recursive or obj_name.count(PATH_SEP) == filter_path_level:
          self.conditional(result, {
            'name': S3URL.combine(s3url.proto, s3url.bucket, obj_name),
            'is_dir': False,
            'size': obj['Size'],
            'last_modified': obj['LastModified']
          })

  def conditional(self, result, obj):
    '''Check all file item with given conditions.'''
    fileonly = (self.opt.last_modified_before is not None) or (self.opt.last_modified_after is not None)

    if obj['is_dir']:
      if not fileonly:
        result.append(obj)
      return

    if (self.opt.last_modified_before is not None) and obj['last_modified'] >= self.opt.last_modified_before:
      return

    if (self.opt.last_modified_after is not None) and obj['last_modified'] <= self.opt.last_modified_after:
      return

    result.append(obj)

  class MultipartItem:
    '''Utility class for multiple part upload/download.
       This class is used to keep track of a single upload/download file, so
       that we can initialize/finalize a file when needed.
    '''
    def __init__(self, id):
      '''Constructor.
         An unique identify for a single donwload/upload file.
           - Download: the temporary file name.
           - Upload: the id of multipart upload provided by S3.
      '''
      self.id = id
      self.parts = []
      self.total = -1

    @synchronized
    def complete(self, part):
      '''Increase the parts list, and see if the file is completely
         uploaded or downloaded.
      '''
      self.parts.append(part)
      return (len(self.parts) == self.total)

    @synchronized
    def sorted_parts(self):
      '''Obtain a sorted part list'''
      # Sort part list based on AWS requirement when completed.
      # See InvalidPartOrder in http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html
      def compare(x, y):
        '''Comparator for part list'''
        return cmp(x['PartNumber'], y['PartNumber'])

      return sorted(self.parts, key=cmp_to_key(compare))

  @log_calls
  def get_file_splits(self, id, source, target, fsize, splitsize):
    '''Get file splits for upload/download/copy operation.'''
    pos = 0
    part = 1 # S3 part id starts from 1
    mpi = ThreadUtil.MultipartItem(id)
    splits = []

    while pos < fsize:
      chunk = min(splitsize, fsize - pos)
      assert(chunk > 0)
      splits.append((source, target, mpi, pos, chunk, part))
      part += 1
      pos += chunk
    mpi.total = len(splits)

    return splits

  @log_calls
  def get_file_privilege(self, source):
    '''Get privileges of a local file'''
    try:
      return str(oct(os.stat(source).st_mode)[-3:])
    except Exception as e:
      raise Failure('Could not get stat for %s, error_message = %s', source, e)

  @log_calls
  def lookup(self, s3url):
    '''Get the s3 object with the S3 URL. Return None if not exist.'''
    try:
      return self.s3.head_object(Bucket=s3url.bucket, Key=s3url.path)
    except BotoClient.ClientError as e:
      if e.response['ResponseMetadata']['HTTPStatusCode'] == 404:
        return None
      else:
        raise e

  @log_calls
  def read_file_chunk(self, source, pos, chunk):
    '''Read local file chunk'''
    if chunk==0:
        return StringIO()
    data = None
    with open(source, 'rb') as f:
      f.seek(pos)
      data = f.read(chunk)
    if not data:
      raise Failure('Unable to read data from source: %s' % source)
    return StringIO(data)

  @log_calls
  def upload(self, source, target, mpi=None, pos=0, chunk=0, part=0):
    '''Thread worker for upload operation.'''
    s3url = S3URL(target)
    obj = self.lookup(s3url)

    # Initialization: Set up multithreaded uploads.
    if not mpi:
      fsize = os.path.getsize(source)
      md5cache = LocalMD5Cache(source)

      # optional checks
      if self.opt.dry_run:
        message('%s => %s', source, target)
        return
      elif self.opt.sync_check and self.sync_check(md5cache, obj):
        message('%s => %s (synced)', source, target)
        return
      elif not self.opt.force and obj:
        raise Failure('File already exists: %s' % target)

      if fsize < self.opt.max_singlepart_upload_size:
        data = self.read_file_chunk(source, 0, fsize)
        self.s3.put_object(Bucket=s3url.bucket,
                           Key=s3url.path,
                           Body=data,
                           Metadata={'md5': md5cache.get_md5(),
                                     'privilege': self.get_file_privilege(source)})
        message('%s => %s', source, target)
        return

      # Here we need to have our own md5 value because multipart upload calculates
      # different md5 values.
      response = self.s3.create_multipart_upload(Bucket=s3url.bucket,
                                                 Key=s3url.path,
                                                 Metadata={'md5': md5cache.get_md5(),
                                                           'privilege': self.get_file_privilege(source)})
      upload_id = response['UploadId']

      for args in self.get_file_splits(upload_id, source, target, fsize, self.opt.multipart_split_size):
        self.pool.upload(*args)
      return

    data = self.read_file_chunk(source, pos, chunk)
    response = self.s3.upload_part(Bucket=s3url.bucket, Key=s3url.path, UploadId=mpi.id, Body=data, PartNumber=part)

    # Finalize
    if mpi.complete({'ETag': response['ETag'], 'PartNumber': part}):
      try:
        self.s3.complete_multipart_upload(Bucket=s3url.bucket, Key=s3url.path, UploadId=mpi.id, MultipartUpload={'Parts': mpi.sorted_parts()})
        message('%s => %s', source, target)
      except Exception as e:
        message('Unable to complete upload: %s', str(e))
        self.s3.abort_multipart_upload(Bucket=s3url.bucket, Key=s3url.path, UploadId=mpi.id)
        raise RetryFailure('Upload failed: Unable to complete upload %s.' % source)

  @log_calls
  def _verify_file_size(self, obj, downloaded_file):
    '''Verify the file size of the downloaded file.'''
    file_size = os.path.getsize(downloaded_file)
    if int(obj['ContentLength']) != file_size:
      raise RetryFailure('Downloaded file size inconsistent: %s' % (repr(obj)))

  @log_calls
  def write_file_chunk(self, target, pos, chunk, body):
    '''Write local file chunk'''
    fd = os.open(target, os.O_CREAT | os.O_WRONLY)
    try:
      os.lseek(fd, pos, os.SEEK_SET)
      data = body.read(chunk)
      num_bytes_written = os.write(fd, data)
      if(num_bytes_written != len(data)):
        raise RetryFailure('Number of bytes written inconsistent: %s != %s' % (num_bytes_written, sys.getsizeof(data)))
    finally:
      os.close(fd)

  @log_calls
  def download(self, source, target, mpi=None, pos=0, chunk=0, part=0):
    '''Thread worker for download operation.'''
    s3url = S3URL(source)
    obj = self.lookup(s3url)
    if obj is None:
      raise Failure('The obj "%s" does not exists.' % (s3url.path,))

    # Initialization: Set up multithreaded downloads.
    if not mpi:
      # optional checks
      if self.opt.dry_run:
        message('%s => %s', source, target)
        return
      elif self.opt.sync_check and self.sync_check(LocalMD5Cache(target), obj):
        message('%s => %s (synced)', source, target)
        return
      elif not self.opt.force and os.path.exists(target):
        raise Failure('File already exists: %s' % target)

      fsize = int(obj['ContentLength'])

      # Small file optimization.
      if fsize < self.opt.max_singlepart_download_size:
        # Create a single part to chain back main download operation.
        mpi = ThreadUtil.MultipartItem(tempfile_get(target))
        mpi.total = 1
        pos = 0
        chunk = fsize
        # Continue as one part download.
      else:
        # Here we use temp filename as the id of mpi.
        for args in self.get_file_splits(tempfile_get(target), source, target, fsize, self.opt.multipart_split_size):
          self.pool.download(*args)
        return

    tempfile = mpi.id
    if self.opt.recursive:
      self.mkdirs(tempfile)

    # Download part of the file, range is inclusive.
    response = self.s3.get_object(Bucket=s3url.bucket, Key=s3url.path, Range='bytes=%d-%d' % (pos, pos + chunk - 1))
    self.write_file_chunk(tempfile, pos, chunk, response['Body'])

    # Finalize
    if mpi.complete({'PartNumber': part}):
      try:
        self.update_privilege(obj, tempfile)
        self._verify_file_size(obj, tempfile)
        tempfile_set(tempfile, target)
        message('%s => %s', source, target)
      except Exception as e:
        # Note that we don't retry in this case, because
        # We are going to remove the temp file, and if we
        # retry here with original parameters (wrapped in
        # the task item), it would fail anyway
        tempfile_set(tempfile, None)
        raise Failure('Download Failure: %s, Source: %s.' % (e.message, source))

  @log_calls
  def copy(self, source, target, mpi=None, pos=0, chunk=0, part=0, delete_source=False):
    '''Copy a single file from source to target using boto S3 library.'''

    if self.opt.dry_run:
      message('%s => %s' % (source, target))
      return

    source_url = S3URL(source)
    target_url = S3URL(target)

    if not mpi:
      obj = self.lookup(source_url)
      fsize = int(obj['ContentLength'])

      if fsize < self.opt.max_singlepart_copy_size:
        self.s3.copy_object(Bucket=target_url.bucket, Key=target_url.path,
                            CopySource={'Bucket': source_url.bucket, 'Key': source_url.path})

        message('%s => %s' % (source, target))
        if delete_source:
          self.delete(source)

        return

      response = self.s3.create_multipart_upload(Bucket=target_url.bucket,
                                                 Key=target_url.path,
                                                 Metadata=obj['Metadata'])
      upload_id = response['UploadId']

      for args in self.get_file_splits(upload_id, source, target, fsize, self.opt.multipart_split_size):
        self.pool.copy(*args, delete_source=delete_source)
      return

    response = self.s3.upload_part_copy(Bucket=target_url.bucket,
                                        Key=target_url.path,
                                        CopySource={'Bucket': source_url.bucket, 'Key': source_url.path},
                                        CopySourceRange='bytes=%d-%d' % (pos, pos + chunk - 1),
                                        UploadId=mpi.id,
                                        PartNumber=part)

    if mpi.complete({'ETag': response['CopyPartResult']['ETag'], 'PartNumber': part}):
      try:
        # Finalize copy operation.
        self.s3.complete_multipart_upload(Bucket=target_url.bucket, Key=target_url.path, UploadId=mpi.id, MultipartUpload={'Parts': mpi.sorted_parts()})

        if delete_source:
          self.delete(source)

        message('%s => %s' % (source, target))
      except Exception as e:
        message('Unable to complete upload: %s', str(e))
        self.s3.abort_multipart_upload(Bucket=source_url.bucket, Key=source_url.path, UploadId=mpi.id)
        raise RetryFailure('Copy failed: Unable to complete copy %s.' % source)

  @log_calls
  def delete(self, source):
    '''Thread worker for download operation.'''
    s3url = S3URL(source)

    message('Delete %s', source)
    if not self.opt.dry_run:
      self.s3.delete_object(Bucket=s3url.bucket, Key=s3url.path)

  @log_calls
  def batch_delete(self, sources):
    '''Delete a list of files in batch of batch_delete_size (default=1000).'''
    assert(type(sources) == list)

    if len(sources) == 0:
      return
    elif len(sources) == 1:
      self.delete(sources[0])
    elif len(sources) > self.opt.batch_delete_size:
      for i in range(0, len(sources), self.opt.batch_delete_size):
        self.pool.batch_delete(sources[i:i+self.opt.batch_delete_size])
    else:
      bucket = S3URL(sources[0]).bucket
      deletes = []
      for source in sources:
        s3url = S3URL(source)
        if s3url.bucket != bucket:
          raise Failure('Unable to delete keys in different bucket %s and %s.' % (s3url.bucket, bucket))
        deletes.append({'Key': s3url.path})

      response = self.s3.delete_objects(Bucket=bucket, Delete={'Objects': deletes})

      # Output result of deletion.
      for res in response.get('Deleted') or []:
        message('Delete %s', S3URL.combine('s3', bucket, res['Key']))

      for err in response.get('Errors') or []:
        message('Error deleting %s, code(%s) %s', S3URL.combine('s3', bucket, res['Key']), err['Code'], err['Message'])

      if response.get('Errors') is not None:
        raise RetryFailure('Unable to complete deleting %d files.' % len(response.get('Errors')))

class CommandHandler(object):
  '''Main class to handle commands.
     This class is responsible for parameter validation and call the corresponding
     operations.
  '''

  def __init__(self, opt):
    '''Constructor'''
    self.opt = opt

  def run(self, args):
    '''Main entry to handle commands. Dispatch to individual command handler.'''
    if len(args) == 0:
      raise InvalidArgument('No command provided')
    cmd = args[0]
    if cmd + '_handler' in CommandHandler.__dict__:
      CommandHandler.__dict__[cmd + '_handler'](self, args)
    else:
      raise InvalidArgument('Unknown command %s' % cmd)

  def s3handler(self):
    '''Create a S3Handler instances for multithread operations.'''
    return S3Handler(self.opt)

  @log_calls
  def validate(self, format, args):
    '''Validate input parameters with given format.
       This function also checks for wildcards for recursive mode.
    '''
    fmtMap = {
      'cmd': 'Command',
      's3': 's3 path',
      'local': 'local path'
    }
    fmts = format.split('|')
    if len(fmts) != len(args):
      raise InvalidArgument('Invalid number of parameters')

    for i, fmt in enumerate(fmts):
      valid = False
      for f in fmt.split(','):
        if f == 'cmd' and args[i] + '_handler' in CommandHandler.__dict__:
          valid = True
        if f == 's3' and S3URL.is_valid(args[i]):
          valid = True
        if f == 'local' and not S3URL.is_valid(args[i]):
          valid = True
      if not valid:
        raise InvalidArgument('Invalid parameter: %s, %s expected' % (args[i], fmtMap[fmt.split(',')[0]]))

  @log_calls
  def pretty_print(self, objlist):
    '''Pretty print the result of s3walk. Here we calculate the maximum width
       of each column and align them.
    '''

    def normalize_time(timestamp):
      '''Normalize the timestamp format for pretty print.'''
      if timestamp is None:
        return ' ' * 16

      return TIMESTAMP_FORMAT % (timestamp.year, timestamp.month, timestamp.day, timestamp.hour, timestamp.minute)

    cwidth = [0, 0, 0]
    format = '%%%ds %%%ds %%-%ds'

    # Calculate maximum width for each column.
    result = []
    for obj in objlist:
      last_modified = normalize_time(obj['last_modified'])
      size = str(obj['size']) if not obj['is_dir'] else 'DIR'
      name = obj['name']
      item = (last_modified, size, name)
      for i, value in enumerate(item):
        if cwidth[i] < len(value):
          cwidth[i] = len(value)
      result.append(item)

    # Format output.
    for item in result:
      text = (format % tuple(cwidth)) % item
      message('%s', text.rstrip())

  @log_calls
  def ls_handler(self, args):
    '''Handler for ls command'''
    if len(args) == 1:
      self.pretty_print(self.s3handler().list_buckets())
      return

    self.validate('cmd|s3', args)
    self.pretty_print(self.s3handler().s3walk(args[1]))

  @log_calls
  def mb_handler(self, args):
    '''Handler for mb command'''
    if len(args) == 1:
      raise InvalidArgument('No s3 bucketname provided')

    self.validate('cmd|s3', args)
    self.s3handler().create_bucket(args[1])

  @log_calls
  def put_handler(self, args):
    '''Handler for put command'''

    # Special check for shell expansion
    if len(args) < 3:
      raise InvalidArgument('Invalid number of parameters')
    self.validate('|'.join(['cmd'] + ['local'] * (len(args) - 2) + ['s3']), args)

    source = args[1:-1] # shell expansion
    target = args[-1]

    self.s3handler().put_files(source, target)

  @log_calls
  def get_handler(self, args):
    '''Handler for get command'''

    # Special case when we don't have target directory.
    if len(args) == 2:
      args += ['.']

    self.validate('cmd|s3|local', args)
    source = args[1]
    target = args[2]
    self.s3handler().get_files(source, target)

  @log_calls
  def cat_handler(self, args):
    '''Handler for cat command'''

    self.validate('cmd|s3', args)
    source = args[1]

    self.s3handler().print_files(source)

  @log_calls
  def dsync_handler(self, args):
    '''Handler for dsync command.'''
    self.opt.recursive = True
    self.opt.sync_check = True
    self.opt.force = True

    self.validate('cmd|s3,local|s3,local', args)
    source = args[1]
    target = args[2]

    self.s3handler().dsync_files(source, target)

  @log_calls
  def sync_handler(self, args):
    '''Handler for sync command.
       XXX Here we emulate sync command with get/put -r -f --sync-check. So
           it doesn't provide delete operation.
    '''
    self.opt.recursive = True
    self.opt.sync_check = True
    self.opt.force = True

    self.validate('cmd|s3,local|s3,local', args)
    source = args[1]
    target = args[2]

    self.s3handler().sync_files(source, target)

  @log_calls
  def cp_handler(self, args):
    '''Handler for cp command'''

    self.validate('cmd|s3|s3', args)
    source = args[1]
    target = args[2]
    self.s3handler().cp_files(source, target)

  @log_calls
  def mv_handler(self, args):
    '''Handler for mv command'''

    self.validate('cmd|s3|s3', args)
    source = args[1]
    target = args[2]
    self.s3handler().cp_files(source, target, delete_source=True)

  @log_calls
  def del_handler(self, args):
    '''Handler for del command'''
    self.validate('cmd|s3', args)
    source = args[1]
    self.s3handler().del_files(source)

  @log_calls
  def du_handler(self, args):
    '''Handler for size command'''
    for src, size in self.s3handler().size(args[1:]):
      message('%s\t%s' % (size, src))

  @log_calls
  def _totalsize_handler(self, args):
    '''Handler of total_size command'''
    total_size = 0
    for src, size in self.s3handler().size(args[1:]):
      total_size += size
    message(str(total_size))

class ExtendedOptParser(optparse.Option):
  '''Specialized parser to handle new types such as datetim and dict'''

  REGEX_DATE = re.compile(r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})')
  REGEX_TIME = re.compile(r'(\d{1,2})\:(\d{2})')
  REGEX_DELTA = re.compile(r'(\d{1,3})\s+(minute|hour|day|week)s?\s+(ago|before|after)')

  def match_date(self, value):
    '''Search for date information in the string'''
    m = self.REGEX_DATE.search(value)
    date = datetime.datetime.utcnow().date()
    if m:
      date = datetime.date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
      value = self.REGEX_DATE.sub('', value)
    return (date, value)

  def match_time(self, value):
    '''Search for time information in the string'''
    m = self.REGEX_TIME.search(value)
    time = datetime.datetime.utcnow().time()
    if m:
      time = datetime.time(int(m.group(1)), int(m.group(2)))
      value = self.REGEX_TIME.sub('', value)
    return (time, value)

  def match_delta(self, value):
    '''Search for timedelta information in the string'''
    m = self.REGEX_DELTA.search(value)
    delta = datetime.timedelta(days=0)
    if m:
      d = int(m.group(1))
      if m.group(3) == 'ago' or m.group(3) == 'before':
        d = -d

      if m.group(2) == 'minute':
        delta = datetime.timedelta(minutes=d)
      elif m.group(2) == 'hour':
        delta = datetime.timedelta(hours=d)
      elif m.group(2) == 'day':
        delta = datetime.timedelta(days=d)
      elif m.group(2) == 'week':
        delta = datetime.timedelta(weeks=d)
      value = self.REGEX_DELTA.sub('', value)
    return (delta, value)

  def check_datetime(self, opt, value):
    (current_date, value) = self.match_date(value.lower())
    (current_time, value) = self.match_time(value)
    (delta, value) = self.match_delta(value)

    # We should be able to handle all stuff in value string.
    value = value.strip()
    if value != '':
      raise optparse.OptionValueError("Option %s: invalid datetime value: %r" % (opt, value))

    # Make sure all datetime are timezone-aware. Use UTC for all datetime instances.
    return pytz.utc.localize(datetime.datetime.combine(current_date, current_time) + delta)

  def check_dict(self, opt, value):
    '''Take json as dictionary parameter'''
    try:
      return json.loads(value)
    except:
      raise optparse.OptionValueError("Option %s: invalid dict value: %r" % (opt, value))

  # Registration functions for option parser.
  TYPES = optparse.Option.TYPES + ('datetime', 'dict')
  TYPE_CHECKER = optparse.Option.TYPE_CHECKER.copy()
  TYPE_CHECKER['datetime'] = check_datetime
  TYPE_CHECKER['dict'] = check_dict

def main():
  try:
      if not sys.argv[0]: sys.argv[0] = ''  # Workaround for running with optparse from egg

      # Parser for command line options.
      parser = optparse.OptionParser(
        option_class=ExtendedOptParser,
        description='Super S3 command line tool. Version %s' % S4CMD_VERSION)

      parser.add_option(
          '--version', help='print out version of s4cmd', dest='version',
          action='store_true', default=False)
      parser.add_option(
          '-p', '--config', help='path to s3cfg config file', dest='s3cfg',
          type='string', default=None)
      parser.add_option(
          '--access-key', help = 'use access_key for connection to S3', dest = 'access_key',
          type = 'string', default = None)
      parser.add_option(
          '--secret-key', help = 'use security key for connection to S3', dest = 'secret_key',
          type = 'string', default = None)
      parser.add_option(
          '-f', '--force', help='force overwrite files when download or upload',
          dest='force', action='store_true', default=False)
      parser.add_option(
          '-r', '--recursive', help='recursively checking subdirectories',
          dest='recursive', action='store_true', default=False)
      parser.add_option(
          '-s', '--sync-check', help='check file md5 before download or upload',
          dest='sync_check', action='store_true', default=False)
      parser.add_option(
          '-n', '--dry-run', help='trial run without actual download or upload',
          dest='dry_run', action='store_true', default=False)
      parser.add_option(
          '-t', '--retry', help='number of retries before giving up',
          dest='retry', type=int, default=3)
      parser.add_option(
          '--retry-delay', help='seconds to sleep between retries',
          type=int, default=10)
      parser.add_option(
          '-c', '--num-threads', help='number of concurrent threads',
          type=int, default=get_default_thread_count())
      parser.add_option(
          '-d', '--show-directory', help='show directory instead of its content',
          dest='show_dir', action='store_true', default=False)
      parser.add_option(
          '--ignore-empty-source', help='ignore empty source from s3',
          dest='ignore_empty_source', action='store_true', default=False)
      parser.add_option(
          '--endpoint-url', help='configure boto3 to use a different s3 endpoint',
          dest='endpoint_url', type='string', default=None)
      parser.add_option(
          '--use-ssl', help='(obsolete) use SSL connection to S3', dest='use_ssl',
          action='store_true', default=False)
      parser.add_option(
          '--verbose', help='verbose output', dest='verbose',
          action='store_true', default=False)
      parser.add_option(
          '--debug', help='debug output', dest='debug',
          action='store_true', default=False)
      parser.add_option(
          '--validate', help='(obsolete) validate lookup operation', dest='validate',
          action='store_true', default=False)
      parser.add_option(
          '-D', '--delete-removed',
          help='delete remote files that do not exist in source after sync',
          dest='delete_removed', action='store_true', default=False)
      parser.add_option(
          '--multipart-split-size',
          help='size in bytes to split multipart transfers', type=int,
          default=50 * 1024 * 1024)
      parser.add_option(
          '--max-singlepart-download-size',
          help='files with size (in bytes) greater than this will be downloaded in '
          'multipart transfers', type=int, default=50 * 1024 * 1024)
      parser.add_option(
          '--max-singlepart-upload-size',
          help='files with size (in bytes) greater than this will be uploaded in '
          'multipart transfers', type=int, default=4500 * 1024 * 1024)
      parser.add_option(
          '--max-singlepart-copy-size',
          help='files with size (in bytes) greater than this will be copied in '
          'multipart transfers', type=int, default=100 * 1024 * 1024)
      parser.add_option(
          '--batch-delete-size',
          help='Number of files (<1000) to be combined in batch delete.',
          type=int, default=1000)
      parser.add_option(
          '--last-modified-before',
          help='Condition on files where their last modified dates are before given parameter.',
          type='datetime', default=None)
      parser.add_option(
          '--last-modified-after',
          help='Condition on files where their last modified dates are after given parameter.',
          type='datetime', default=None)

      # Extra S3 API arguments
      BotoClient.add_options(parser)

      # Combine parameters from environment variable. This is useful for global settings.
      env_opts = (shlex.split(os.environ[S4CMD_ENV_KEY]) if S4CMD_ENV_KEY in os.environ else [])
      (opt, args) = parser.parse_args(sys.argv[1:] + env_opts)
      s4cmd_logging.configure(opt)

      if opt.version:
        message('s4cmd version %s' % S4CMD_VERSION)
      else:
        # Initalize keys for S3.
        S3Handler.init_s3_keys(opt)
        try:
          CommandHandler(opt).run(args)
        except InvalidArgument as e:
          fail('[Invalid Argument] ', exc_info=e)
        except Failure as e:
          fail('[Runtime Failure] ', exc_info=e)
        except BotoClient.NoCredentialsError as e:
          fail('[Invalid Argument] ', exc_info=e)
        except BotoClient.BotoError as e:
          fail('[Boto3Error] %s: %s' % (e.error_code, e.error_message))
        except Exception as e:
          fail('[Runtime Exception] ', exc_info=e, stacktrace=True)

      clean_tempfiles()
      progress('') # Clear progress message before exit.
  except Exception:
      if not opt.verbose:
        sys.exit(1)

if __name__ == '__main__':
  main()