File: utils.py

package info (click to toggle)
smart-open 7.5.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 980 kB
  • sloc: python: 8,054; sh: 90; makefile: 14
file content (232 lines) | stat: -rw-r--r-- 7,293 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Radim Rehurek <me@radimrehurek.com>
#
# This code is distributed under the terms and conditions
# from the MIT License (MIT).
#

"""Helper functions for documentation, etc."""

import inspect
import io
import logging
import urllib.parse

import wrapt

logger = logging.getLogger(__name__)

WORKAROUND_SCHEMES = ['s3', 's3n', 's3u', 's3a', 'gs']
QUESTION_MARK_PLACEHOLDER = '///smart_open.utils.QUESTION_MARK_PLACEHOLDER///'


def inspect_kwargs(kallable):
    #
    # inspect.getargspec got deprecated in Py3.4, and calling it spews
    # deprecation warnings that we'd prefer to avoid.  Unfortunately, older
    # versions of Python (<3.3) did not have inspect.signature, so we need to
    # handle them the old-fashioned getargspec way.
    #
    try:
        signature = inspect.signature(kallable)
    except AttributeError:
        try:
            args, varargs, keywords, defaults = inspect.getargspec(kallable)
        except TypeError:
            #
            # Happens under Py2.7 with mocking.
            #
            return {}

        if not defaults:
            return {}
        supported_keywords = args[-len(defaults):]
        return dict(zip(supported_keywords, defaults))
    else:
        return {
            name: param.default
            for name, param in signature.parameters.items()
            if param.default != inspect.Parameter.empty
        }


def check_kwargs(kallable, kwargs):
    """Check which keyword arguments the callable supports.

    Parameters
    ----------
    kallable: callable
        A function or method to test
    kwargs: dict
        The keyword arguments to check.  If the callable doesn't support any
        of these, a warning message will get printed.

    Returns
    -------
    dict
        A dictionary of argument names and values supported by the callable.
    """
    supported_keywords = sorted(inspect_kwargs(kallable))
    unsupported_keywords = [k for k in sorted(kwargs) if k not in supported_keywords]
    supported_kwargs = {k: v for (k, v) in kwargs.items() if k in supported_keywords}

    if unsupported_keywords:
        logger.warning('ignoring unsupported keyword arguments: %r', unsupported_keywords)

    return supported_kwargs


def clamp(value, minval=0, maxval=None):
    """Clamp a numeric value to a specific range.

    Parameters
    ----------
    value: numeric
        The value to clamp.

    minval: numeric
        The lower bound.

    maxval: numeric
        The upper bound.

    Returns
    -------
    numeric
        The clamped value.  It will be in the range ``[minval, maxval]``.

    """
    if maxval is not None:
        value = min(value, maxval)
    value = max(value, minval)
    return value


def make_range_string(start=None, stop=None):
    """Create a byte range specifier in accordance with RFC-2616.

    Parameters
    ----------
    start: int, optional
        The start of the byte range.  If unspecified, stop indicated offset from EOF.

    stop: int, optional
        The end of the byte range.  If unspecified, indicates EOF.

    Returns
    -------
    str
        A byte range specifier.

    """
    #
    # https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
    #
    if start is None and stop is None:
        raise ValueError("make_range_string requires either a stop or start value")
    start_str = '' if start is None else str(start)
    stop_str = '' if stop is None else str(stop)
    return 'bytes=%s-%s' % (start_str, stop_str)


def parse_content_range(content_range):
    """Extract units, start, stop, and length from a content range header like "bytes 0-846981/846982".

    Assumes a properly formatted content-range header from S3.
    See werkzeug.http.parse_content_range_header for a more robust version.

    Parameters
    ----------
    content_range: str
        The content-range header to parse.

    Returns
    -------
    tuple (units: str, start: int, stop: int, length: int)
        The units and three integers from the content-range header.

    """
    units, numbers = content_range.split(' ', 1)
    range, length = numbers.split('/', 1)
    start, stop = range.split('-', 1)
    return units, int(start), int(stop), int(length)


def safe_urlsplit(url):
    """This is a hack to prevent the regular urlsplit from splitting around question marks.

    A question mark (?) in a URL typically indicates the start of a
    querystring, and the standard library's urlparse function handles the
    querystring separately.  Unfortunately, question marks can also appear
    _inside_ the actual URL for some schemas like S3, GS.

    Replaces question marks with a special placeholder substring prior to
    splitting.  This work-around behavior is disabled in the unlikely event the
    placeholder is already part of the URL.  If this affects you, consider
    changing the value of QUESTION_MARK_PLACEHOLDER to something more suitable.

    See Also
    --------
    https://bugs.python.org/issue43882
    https://github.com/python/cpython/blob/3.14/Lib/urllib/parse.py
    https://github.com/piskvorky/smart_open/issues/285
    https://github.com/piskvorky/smart_open/issues/458
    smart_open/utils.py:QUESTION_MARK_PLACEHOLDER
    """
    sr = urllib.parse.urlsplit(url, allow_fragments=False)

    placeholder = None
    if sr.scheme in WORKAROUND_SCHEMES and '?' in url and QUESTION_MARK_PLACEHOLDER not in url:
        #
        # This is safe because people will _almost never_ use the below
        # substring in a URL.  If they do, then they're asking for trouble,
        # and this special handling will simply not happen for them.
        #
        placeholder = QUESTION_MARK_PLACEHOLDER
        url = url.replace('?', placeholder)
        sr = urllib.parse.urlsplit(url, allow_fragments=False)

    if placeholder is None:
        return sr

    path = sr.path.replace(placeholder, '?')
    return urllib.parse.SplitResult(sr.scheme, sr.netloc, path, '', '')


class TextIOWrapper(io.TextIOWrapper):
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Call close on underlying buffer only when there was no exception.

        Without this patch, TextIOWrapper would call self.buffer.close() during
        exception handling, which is unwanted for e.g. s3 and azure. They only call
        self.close() when there was no exception (self.terminate() otherwise) to avoid
        committing unfinished/failed uploads.
        """
        if exc_type is None:
            self.close()


class FileLikeProxy(wrapt.ObjectProxy):
    __inner = ...  # initialized before wrapt disallows __setattr__ on certain objects

    def __init__(self, outer, inner):
        super().__init__(outer)
        self.__inner = inner

    def __exit__(self, *args, **kwargs):
        """Exit inner after exiting outer."""
        try:
            return super().__exit__(*args, **kwargs)
        finally:
            self.__inner.__exit__(*args, **kwargs)

    def __next__(self):
        return self.__wrapped__.__next__()

    def close(self):
        try:
            return self.__wrapped__.close()
        finally:
            if self.__inner != self.__wrapped__:  # Don't close again if inner and wrapped are the same
                self.__inner.close()