1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
|
from __future__ import print_function, division, absolute_import
import ast
import json
from future import standard_library
standard_library.install_aliases()
import token
from future.utils import raise_from
import ntpath
import os
import types
from sys import version_info
from typing import TypeVar, Union, List, Any, Iterator, Tuple, Iterable
try:
from typing import Type
except ImportError:
Type = type
try:
from typing import Deque
except ImportError:
from collections import deque as Deque
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from littleutils import strip_required_prefix
PY2 = version_info.major == 2
PY3 = not PY2
T = TypeVar('T')
RT = TypeVar('RT')
IPYTHON_FILE_PATH = 'IPython notebook or shell'
FILE_SENTINEL_NAME = '$$__FILE__$$'
if PY2:
Text = unicode
else:
Text = str
def path_leaf(path):
# type: (str) -> str
# http://stackoverflow.com/a/8384788/2482744
head, tail = ntpath.split(path)
return tail or ntpath.basename(head)
def common_ancestor(paths):
# type: (List[str]) -> str
"""
Returns a path to a directory that contains all the given absolute paths
"""
prefix = os.path.commonprefix(paths)
# Ensure that the prefix doesn't end in part of the name of a file/directory
prefix = ntpath.split(prefix)[0]
# Ensure that it ends with a slash
first_char_after = paths[0][len(prefix)]
if first_char_after in r'\/':
prefix += first_char_after
return prefix
def short_path(path, all_paths):
# type: (str, List[str]) -> str
if path == IPYTHON_FILE_PATH:
return path
all_paths = [f for f in all_paths
if f != IPYTHON_FILE_PATH]
prefix = common_ancestor(all_paths)
if prefix in r'\/':
prefix = ''
return strip_required_prefix(path, prefix) or path_leaf(path)
def fix_abs_path(path):
if path == IPYTHON_FILE_PATH:
return path
if os.path.sep == '/' and not path.startswith('/'):
path = '/' + path
return path
if PY2:
def correct_type(obj):
"""
Returns the correct type of obj, regardless of __class__ assignment
or old-style classes:
>>> class A:
... pass
...
...
... class B(object):
... pass
...
...
... class C(object):
... __class__ = A
...
>>> correct_type(A()) is A
True
>>> correct_type(B()) is B
True
>>> correct_type(C()) is C
True
"""
t = type(obj)
# noinspection PyUnresolvedReferences
if t is types.InstanceType:
return obj.__class__
return t
else:
correct_type = type
def of_type(type_or_tuple, iterable):
# type: (Union[type, Tuple[Union[type, tuple], ...]], Iterable[Any]) -> Iterator[Any]
return (x for x in iterable if isinstance(x, type_or_tuple))
def safe_next(it):
# type: (Iterator[T]) -> T
"""
next() can raise a StopIteration which can cause strange bugs inside generators.
"""
try:
return next(it)
except StopIteration as e:
raise_from(RuntimeError, e)
raise # isn't reached
def one_or_none(expression):
"""Performs a one_or_none on a sqlalchemy expression."""
if hasattr(expression, 'one_or_none'):
return expression.one_or_none()
result = expression.all()
if len(result) == 0:
return None
elif len(result) == 1:
return result[0]
else:
raise Exception("There is more than one item returned for the supplied filter")
def flatten_list(lst):
result = []
for x in lst:
if isinstance(x, list):
result.extend(flatten_list(x))
else:
result.append(x)
return result
def is_lambda(f):
try:
code = f.__code__
except AttributeError:
return False
return code.co_name == (lambda: 0).__code__.co_name
class ProtocolEncoder(json.JSONEncoder):
def default(self, o):
try:
method = o.as_json
except AttributeError:
return super(ProtocolEncoder, self).default(o)
else:
return method()
try:
# Python 3
from tokenize import open as open_with_encoding_check
except ImportError:
# Python 2
from lib2to3.pgen2.tokenize import detect_encoding
import io
def open_with_encoding_check(filename): # type: ignore
"""Open a file in read only mode using the encoding detected by
detect_encoding().
"""
fp = io.open(filename, 'rb')
try:
encoding, lines = detect_encoding(fp.readline)
fp.seek(0)
text = io.TextIOWrapper(fp, encoding, line_buffering=True)
text.mode = 'r'
return text
except:
fp.close()
raise
def read_source_file(filename):
from lib2to3.pgen2.tokenize import cookie_re
if filename.endswith('.pyc'):
filename = filename[:-1]
with open_with_encoding_check(filename) as f:
return ''.join([
'\n' if i < 2 and cookie_re.match(line)
else line
for i, line in enumerate(f)
])
def source_without_decorators(tokens, function_node):
def_token = safe_next(t for t in tokens.get_tokens(function_node)
if t.string == 'def' and t.type == token.NAME)
startpos = def_token.startpos
source = tokens.text[startpos:function_node.last_token.endpos].rstrip()
assert source.startswith('def')
return startpos, source
def prn(*args):
for arg in args:
print(arg)
if len(args) == 1:
return args[0]
return args
def is_ipython_cell(filename):
return filename.startswith('<ipython-input-')
def is_future_import(node):
return isinstance(node, ast.ImportFrom) and node.module == "__future__"
def get_unfrozen_datetime():
try:
# if freezegun could be active, we need to use real_datetime to ensure we use the actual time instead of the
# time set by freezegun.
# we have to import this at the last possible moment because birdeye is very likely to be imported before
# freezegun is activated.
from freezegun.api import real_datetime
except ImportError:
from datetime import datetime as real_datetime
return real_datetime.now()
|