File: s3.py

package info (click to toggle)
augur 24.4.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 25,312 kB
  • sloc: python: 14,253; sh: 227; makefile: 35
file content (297 lines) | stat: -rw-r--r-- 11,800 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
"""
Script to sync files between local disk and S3 buckets.

# From augur/ directory

# Download flu H3N2 data into auspice directory.
python scripts/s3.py pull -b nextstrain-staging \
    --to builds/flu/auspice --prefixes flu_h3n2

# Upload flu H3N2 data to S3 dev bucket.
python scripts/s3.py push -b nextstrain-staging \
    -g "builds/flu/auspice/flu_h3n2_*"

# Sync H3N2 data from one bucket to another and create CloudFront invalidation.
python ../../scripts/s3.py sync --from nextstrain-staging \
    --to production-data --prefixes flu_h3n2
"""
import argparse, boto3, botocore, glob, gzip, io, logging, os, shutil, time


# Map S3 buckets to their corresponding CloudFront ids.
CLOUDFRONT_ID_BY_BUCKET = {
    "nextstrain-staging": "E3L83FTHWUN0BV",
    "nextstrain-data": "E3LB0EWZKCCV"
}


def get_bucket_keys_by_prefixes(bucket, prefixes):
    """Get objects from the given bucket instance that match the given list of
    prefixes. If no prefixes are given, get all objects.

    Args:
        bucket: S3 Bucket instance
        prefixes: NoneType or list of key prefixes to filter objects in the bucket by

    Returns:
        list: sorted list of object keys matching the given prefixes
    """
    if prefixes is not None:
        object_keys = []
        for prefix in prefixes:
            keys = [obj.key for obj in bucket.objects.filter(Prefix=prefix)]
            object_keys.extend(keys)

        object_keys = sorted(set(object_keys))
    else:
        object_keys = sorted([obj.key for obj in bucket.objects.all()])

    return object_keys


def create_cloudfront_invalidation(bucket_name, path):
    """Create a cache invalidation for the given files if a CloudFront id is given.

    Args:
        bucket_name: an S3 bucket name that may or may not have a CloudFront id
        path: invalidation path, may contain *

    Returns:
        dict or NoneType: CloudFront API response or None if no CloudFront is defined for the given bucket
    """
    # Setup logging.
    logger = logging.getLogger(__name__)

    # Find CloudFront id for the given bucket name.
    cloudfront_id = CLOUDFRONT_ID_BY_BUCKET.get(bucket_name)
    if cloudfront_id is None:
        logger.warning("Could not find a CloudFront id for the S3 bucket '%s'" % bucket_name)
        return

    print("Creating invalidation for '%s' in the CloudFront distribution '%s'" % (path, cloudfront_id))

    # Connect to CloudFront.
    cloudfront = boto3.client("cloudfront")

    # Create the invalidation. Top-level keys require a "/" prefix for
    # proper invalidation. This is purposely a single path invalidation
    # due to how AWS charges, see here:
    # https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/Invalidation.html
    invalidation_batch = {
        "Paths": {
            "Quantity": 1,
            "Items": ["/%s" % path]
        },
        "CallerReference": str(time.time())
    }
    logger.debug("Invalidation batch: %s" % str(invalidation_batch))

    # Create the invalidation.
    response = cloudfront.create_invalidation(
        DistributionId=cloudfront_id,
        InvalidationBatch=invalidation_batch
    )
    logger.debug("CloudFront response: %s" % str(response))

    return response


def push(bucket_name, file_glob, dryrun=False):
    """Push the given files to the given S3 bucket and optionally invalidate the
    cache for a given CloudFront id.

    Args:
        bucket_name: S3 bucket to pull from
        glob: string to identify local files to push, of the from builds/zika/auspice/zika_*
        dryrun: boolean indicating whether files should be downloaded or not
    """
    # Setup logging.
    logger = logging.getLogger(__name__)

    # Construct file list from glob
    files = glob.glob(file_glob)

    # Create a distinct list of files to push.
    files = list(set(files))

    # Confirm that all given file paths are proper files.
    for file_name in files:
        assert os.path.isfile(file_name), "The requested input '%s' is not a proper file" % file_name

    # Connect to S3.
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(bucket_name)

    # Upload local files, stripping directory names from the given file paths
    # for the S3 keys.
    print("Uploading %i files to bucket '%s'" % (len(files), bucket_name))
    s3_keys = []
    for file_name in files:
        s3_key = os.path.split(file_name)[-1]
        s3_keys.append(s3_key)
        logger.info("Uploading '%s' as '%s'" % (file_name, s3_key))

        if not dryrun:
            # Open uncompressed file to be uploaded.
            with open(file_name, "rb") as fh:
                # Compress the file in memory.
                compressed_fh = io.BytesIO()
                with gzip.GzipFile(fileobj=compressed_fh, mode="wb") as gz:
                    shutil.copyfileobj(fh, gz)

                compressed_fh.seek(0)

                # Upload the compressed file with associated metadata.
                bucket.upload_fileobj(
                    compressed_fh,
                    s3_key,
                    {"ContentEncoding": "gzip", "ContentType": "application/json"}
                )

    # Create a CloudFront invalidation for the destination bucket
    if not dryrun:
        invalidation_path = os.path.split(file_glob)[-1]
        response = create_cloudfront_invalidation(bucket_name, invalidation_path)


def pull(bucket_name, prefixes=None, local_dir=None, dryrun=False):
    """Pull files from the given S3 bucket. Optionally, only pull files that match
    the given list of filename prefixes.

    Args:
        bucket_name: S3 bucket to pull from
        prefixes: a list of key prefixes to filter objects in the bucket by
        local_dir: a local directory to download files into
        dryrun: boolean indicating whether files should be downloaded or not
    """
    # Setup logging.
    logger = logging.getLogger(__name__)

    # Confirm that the given local directory is a real directory.
    if local_dir is not None:
        assert os.path.isdir(local_dir), "The requested output directory '%s' is not a proper directory." % local_dir

    # Connect to S3.
    s3 = boto3.resource("s3")

    # Get a list of all objects in the requested bucket.
    bucket = s3.Bucket(bucket_name)

    # Get keys by prefixes.
    object_keys = get_bucket_keys_by_prefixes(bucket, prefixes)

    # Download objects.
    print("Downloading %i files from bucket '%s'" % (len(object_keys), bucket_name))
    for key in object_keys:
        if not key.endswith("json"):
            logger.warning("Skipping unsupported file type for file '%s'" % key)
            continue

        # Download into a local directory if requested.
        if local_dir is not None:
            local_key = os.path.join(local_dir, key)
        else:
            local_key = key

        logger.info("Downloading '%s' as '%s'" % (key, local_key))
        if not dryrun:
            with open(local_key, "wb") as fh:
                # Download the compressed file into memory.
                compressed_fh = io.BytesIO()
                bucket.download_fileobj(key, compressed_fh)
                compressed_fh.seek(0)

                # Write the uncompressed data from the file to disk.
                try:
                    with gzip.GzipFile(fileobj=compressed_fh, mode="rb") as gz:
                        shutil.copyfileobj(gz, fh)
                except IOError:
                    logger.warning("File %s does not appear to be compressed, trying to pull as an uncompressed file" % key)
                    shutil.copyfileobj(compressed_fh, fh)


def sync(source_bucket_name, destination_bucket_name, prefixes=None, dryrun=False):
    """Sync files from a given source bucket to a given destination bucket. An
    optional list of prefixes will restrict the files being synced between
    buckets.

    Args:
        source_bucket_name: name of bucket to copy files from
        destination_bucket_name: name of bucket to copy files to
        prefixes: a list of key prefixes to filter objects in the bucket by
        dryrun: boolean indicating whether files should be downloaded or not
    """
    # Setup logging.
    logger = logging.getLogger(__name__)

    # Connect to S3.
    s3 = boto3.resource("s3")

    # Get source bucket.
    source_bucket = s3.Bucket(source_bucket_name)

    # Get source bucket keys by prefixes.
    object_keys = get_bucket_keys_by_prefixes(source_bucket, prefixes)
    if len(object_keys) == 0:
        raise Exception("No files in the source bucket '%s' matched the given prefixes: %s" % (source_bucket_name, str(prefixes)))

    print("Syncing %i files from '%s' to '%s'" % (len(object_keys), source_bucket_name, destination_bucket_name))

    # Copy objects from source to destination by key.
    for key in object_keys:
        logger.info("Copying '%s'" % key)

        if not dryrun:
            copy_source = {
                'Bucket': source_bucket_name,
                'Key': key
            }
            s3.meta.client.copy(copy_source, destination_bucket_name, key)

    # Create a CloudFront invalidation for the destination bucket.
    if not dryrun:
        response = create_cloudfront_invalidation(destination_bucket_name, object_keys)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Upload files to a (nextstrain) S3 bucket and perform cloudfront invalidation",
        epilog="P.S. run \"s3.py <cmd> -h\" to see the help specific to that command"
    )
    parser.add_argument("--verbose", "-v", action="store_const", dest="loglevel", const=logging.INFO, help="Enable verbose logging")
    parser.add_argument("--debug", "-d", action="store_const", dest="loglevel", const=logging.DEBUG, help="Enable debugging logging")
    parser.add_argument("--dryrun", "-n", action="store_true", help="Perform a dryrun without uploading or downloading any files")

    subparsers = parser.add_subparsers(dest="command_name")

    parser_push = subparsers.add_parser("push")
    parser_push.add_argument("--bucket", "-b", type=str, help="S3 bucket to push files to")
    parser_push.add_argument("--glob", "-g", type=str, help="Glob string to identify set of local files, must have quotes")
    parser_push.set_defaults(func=push)

    parser_pull = subparsers.add_parser("pull")
    parser_pull.add_argument("--bucket", "-b", type=str, help="S3 bucket to pull files from")
    parser_pull.add_argument("--prefixes", "-p", nargs="+", action="extend", help="One or more file prefixes to match in the given bucket")
    parser_pull.add_argument("--local_dir", "--to", "-t", help="Local directory to download files into")
    parser_pull.set_defaults(func=pull)

    parser_sync = subparsers.add_parser("sync")
    parser_sync.add_argument("--source_bucket", "--from", type=str, help="Source S3 bucket")
    parser_sync.add_argument("--destination_bucket", "--to", type=str, help="Destination S3 bucket")
    parser_sync.add_argument("--prefixes", "-p", nargs="+", action="extend", help="One or more prefixes for files to sync between buckets")
    parser_sync.set_defaults(func=sync)

    args = parser.parse_args()
    logging.basicConfig(level=args.loglevel)

    try:
        if args.command_name == "push":
            args.func(args.bucket, args.glob, args.dryrun)
        elif args.command_name == "pull":
            args.func(args.bucket, args.prefixes, args.local_dir, args.dryrun)
        elif args.command_name == "sync":
            args.func(args.source_bucket, args.destination_bucket, args.prefixes, args.dryrun)
    except botocore.exceptions.NoCredentialsError as e:
        parser.error("Unable to locate AWS credentials. Set environment variables for AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID.")
    except Exception as e:
        parser.error(e.message)