File: utils.py

package info (click to toggle)
python-executing 2.2.0-0.3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 11,860 kB
  • sloc: python: 10,235; sh: 48; makefile: 10
file content (264 lines) | stat: -rw-r--r-- 6,484 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
from __future__ import print_function, division, absolute_import

import ast
import json

from future import standard_library

standard_library.install_aliases()
import token

from future.utils import raise_from
import ntpath
import os
import types
from sys import version_info
from typing import TypeVar, Union, List, Any, Iterator, Tuple, Iterable

try:
    from typing import Type
except ImportError:
    Type = type

try:
    from typing import Deque
except ImportError:
    from collections import deque as Deque

try:
    from functools import lru_cache
except ImportError:
    from backports.functools_lru_cache import lru_cache

from littleutils import strip_required_prefix

PY2 = version_info.major == 2
PY3 = not PY2
T = TypeVar('T')
RT = TypeVar('RT')
IPYTHON_FILE_PATH = 'IPython notebook or shell'
FILE_SENTINEL_NAME = '$$__FILE__$$'

if PY2:
    Text = unicode
else:
    Text = str


def path_leaf(path):
    # type: (str) -> str
    # http://stackoverflow.com/a/8384788/2482744
    head, tail = ntpath.split(path)
    return tail or ntpath.basename(head)


def common_ancestor(paths):
    # type: (List[str]) -> str
    """
    Returns a path to a directory that contains all the given absolute paths
    """
    prefix = os.path.commonprefix(paths)

    # Ensure that the prefix doesn't end in part of the name of a file/directory
    prefix = ntpath.split(prefix)[0]

    # Ensure that it ends with a slash
    first_char_after = paths[0][len(prefix)]
    if first_char_after in r'\/':
        prefix += first_char_after

    return prefix


def short_path(path, all_paths):
    # type: (str, List[str]) -> str
    if path == IPYTHON_FILE_PATH:
        return path

    all_paths = [f for f in all_paths
                 if f != IPYTHON_FILE_PATH]
    prefix = common_ancestor(all_paths)
    if prefix in r'\/':
        prefix = ''
    return strip_required_prefix(path, prefix) or path_leaf(path)


def fix_abs_path(path):
    if path == IPYTHON_FILE_PATH:
        return path
    if os.path.sep == '/' and not path.startswith('/'):
        path = '/' + path
    return path


if PY2:
    def correct_type(obj):
        """
        Returns the correct type of obj, regardless of __class__ assignment
        or old-style classes:

        >>> class A:
        ...     pass
        ...
        ...
        ... class B(object):
        ...     pass
        ...
        ...
        ... class C(object):
        ...     __class__ = A
        ...
        >>> correct_type(A()) is A
        True
        >>> correct_type(B()) is B
        True
        >>> correct_type(C()) is C
        True
        """
        t = type(obj)
        # noinspection PyUnresolvedReferences
        if t is types.InstanceType:
            return obj.__class__
        return t
else:
    correct_type = type


def of_type(type_or_tuple, iterable):
    # type: (Union[type, Tuple[Union[type, tuple], ...]], Iterable[Any]) -> Iterator[Any]
    return (x for x in iterable if isinstance(x, type_or_tuple))


def safe_next(it):
    # type: (Iterator[T]) -> T
    """
    next() can raise a StopIteration which can cause strange bugs inside generators.
    """
    try:
        return next(it)
    except StopIteration as e:
        raise_from(RuntimeError, e)
        raise  # isn't reached


def one_or_none(expression):
    """Performs a one_or_none on a sqlalchemy expression."""
    if hasattr(expression, 'one_or_none'):
        return expression.one_or_none()
    result = expression.all()
    if len(result) == 0:
        return None
    elif len(result) == 1:
        return result[0]
    else:
        raise Exception("There is more than one item returned for the supplied filter")


def flatten_list(lst):
    result = []
    for x in lst:
        if isinstance(x, list):
            result.extend(flatten_list(x))
        else:
            result.append(x)
    return result


def is_lambda(f):
    try:
        code = f.__code__
    except AttributeError:
        return False
    return code.co_name == (lambda: 0).__code__.co_name


class ProtocolEncoder(json.JSONEncoder):
    def default(self, o):
        try:
            method = o.as_json
        except AttributeError:
            return super(ProtocolEncoder, self).default(o)
        else:
            return method()


try:

    # Python 3
    from tokenize import open as open_with_encoding_check

except ImportError:

    # Python 2
    from lib2to3.pgen2.tokenize import detect_encoding
    import io


    def open_with_encoding_check(filename):  # type: ignore
        """Open a file in read only mode using the encoding detected by
        detect_encoding().
        """
        fp = io.open(filename, 'rb')
        try:
            encoding, lines = detect_encoding(fp.readline)
            fp.seek(0)
            text = io.TextIOWrapper(fp, encoding, line_buffering=True)
            text.mode = 'r'
            return text
        except:
            fp.close()
            raise


def read_source_file(filename):
    from lib2to3.pgen2.tokenize import cookie_re

    if filename.endswith('.pyc'):
        filename = filename[:-1]

    with open_with_encoding_check(filename) as f:
        return ''.join([
            '\n' if i < 2 and cookie_re.match(line)
            else line
            for i, line in enumerate(f)
        ])


def source_without_decorators(tokens, function_node):
    def_token = safe_next(t for t in tokens.get_tokens(function_node)
                          if t.string == 'def' and t.type == token.NAME)

    startpos = def_token.startpos
    source = tokens.text[startpos:function_node.last_token.endpos].rstrip()
    assert source.startswith('def')

    return startpos, source


def prn(*args):
    for arg in args:
        print(arg)
    if len(args) == 1:
        return args[0]
    return args


def is_ipython_cell(filename):
    return filename.startswith('<ipython-input-')


def is_future_import(node):
    return isinstance(node, ast.ImportFrom) and node.module == "__future__"


def get_unfrozen_datetime():
    try:
        # if freezegun could be active, we need to use real_datetime to ensure we use the actual time instead of the
        # time set by freezegun.
        # we have to import this at the last possible moment because birdeye is very likely to be imported before
        # freezegun is activated.
        from freezegun.api import real_datetime
    except ImportError:
        from datetime import datetime as real_datetime

    return real_datetime.now()