1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Radim Rehurek <me@radimrehurek.com>
#
# This code is distributed under the terms and conditions
# from the MIT License (MIT).
#
"""Implements file-like objects for reading and writing to/from GCS."""
import logging
import warnings
try:
import google.cloud.exceptions
import google.cloud.storage
import google.auth.transport.requests
except ImportError:
MISSING_DEPS = True
import smart_open.bytebuffer
import smart_open.utils
from smart_open import constants
logger = logging.getLogger(__name__)
SCHEME = "gs"
"""Supported scheme for GCS"""
_DEFAULT_MIN_PART_SIZE = 50 * 1024**2
"""Default minimum part size for GCS multipart uploads"""
_DEFAULT_WRITE_OPEN_KWARGS = {'ignore_flush': True}
def parse_uri(uri_as_string):
sr = smart_open.utils.safe_urlsplit(uri_as_string)
assert sr.scheme == SCHEME
bucket_id = sr.netloc
blob_id = sr.path.lstrip('/')
return dict(scheme=SCHEME, bucket_id=bucket_id, blob_id=blob_id)
def open_uri(uri, mode, transport_params):
parsed_uri = parse_uri(uri)
kwargs = smart_open.utils.check_kwargs(open, transport_params)
return open(parsed_uri['bucket_id'], parsed_uri['blob_id'], mode, **kwargs)
def warn_deprecated(parameter_name):
message = f"Parameter {parameter_name} is deprecated, this parameter no-longer has any effect"
warnings.warn(message, UserWarning)
def open(
bucket_id,
blob_id,
mode,
buffer_size=None,
min_part_size=_DEFAULT_MIN_PART_SIZE,
client=None, # type: google.cloud.storage.Client
get_blob_kwargs=None,
blob_properties=None,
blob_open_kwargs=None,
):
"""Open an GCS blob for reading or writing.
Parameters
----------
bucket_id: str
The name of the bucket this object resides in.
blob_id: str
The name of the blob within the bucket.
mode: str
The mode for opening the object. Must be either "rb" or "wb".
buffer_size:
deprecated
min_part_size: int, optional
The minimum part size for multipart uploads. For writing only.
client: google.cloud.storage.Client, optional
The GCS client to use when working with google-cloud-storage.
get_blob_kwargs: dict, optional
Additional keyword arguments to propagate to the bucket.get_blob
method of the google-cloud-storage library. For reading only.
blob_properties: dict, optional
Set properties on blob before writing. For writing only.
blob_open_kwargs: dict, optional
Additional keyword arguments to propagate to the blob.open method
of the google-cloud-storage library.
"""
if blob_open_kwargs is None:
blob_open_kwargs = {}
if buffer_size is not None:
warn_deprecated('buffer_size')
if mode in (constants.READ_BINARY, 'r', 'rt'):
_blob = Reader(bucket=bucket_id,
key=blob_id,
client=client,
get_blob_kwargs=get_blob_kwargs,
blob_open_kwargs=blob_open_kwargs)
elif mode in (constants.WRITE_BINARY, 'w', 'wt'):
_blob = Writer(bucket=bucket_id,
blob=blob_id,
min_part_size=min_part_size,
client=client,
blob_properties=blob_properties,
blob_open_kwargs=blob_open_kwargs)
else:
raise NotImplementedError(f'GCS support for mode {mode} not implemented')
return _blob
def Reader(bucket,
key,
buffer_size=None,
line_terminator=None,
client=None,
get_blob_kwargs=None,
blob_open_kwargs=None):
if get_blob_kwargs is None:
get_blob_kwargs = {}
if blob_open_kwargs is None:
blob_open_kwargs = {}
if client is None:
client = google.cloud.storage.Client()
if buffer_size is not None:
warn_deprecated('buffer_size')
if line_terminator is not None:
warn_deprecated('line_terminator')
bkt = client.bucket(bucket)
blob = bkt.get_blob(key, **get_blob_kwargs)
if blob is None:
raise google.cloud.exceptions.NotFound(f'blob {key} not found in {bucket}')
return blob.open('rb', **blob_open_kwargs)
def Writer(bucket,
blob,
min_part_size=None,
client=None,
blob_properties=None,
blob_open_kwargs=None):
if blob_open_kwargs is None:
blob_open_kwargs = {}
if blob_properties is None:
blob_properties = {}
if client is None:
client = google.cloud.storage.Client()
blob_open_kwargs = {**_DEFAULT_WRITE_OPEN_KWARGS, **blob_open_kwargs}
g_blob = client.bucket(bucket).blob(
blob,
chunk_size=min_part_size,
)
for k, v in blob_properties.items():
setattr(g_blob, k, v)
_blob = g_blob.open('wb', **blob_open_kwargs)
# backwards-compatiblity, was deprecated upstream https://cloud.google.com/storage/docs/resumable-uploads
_blob.terminate = lambda: None
return _blob
|