1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
import codecs
import encodings
import re
import sys
import traceback
from encodings import utf_8
from io import StringIO
from tokenize import untokenize
from noseOfYeti.tokeniser.tokeniser import Tokeniser
regexes = {
"whitespace": re.compile(r"\s*"),
"only_whitespace": re.compile(r"^\s*$"),
"encoding_matcher": re.compile(r"#\s*coding\s*:\s*spec"),
"leading_whitespace": re.compile(r"^(\s*)[^\s]"),
}
class TokeniserCodec:
"""Class to register the spec codec"""
def __init__(self, tokeniser):
self.tokeniser = tokeniser
self.transform = True
self.codec = self.get_codec()
def translate(self, value, transform=None):
if isinstance(value, str):
value = value.encode()
return self.codec.decode(
value, return_tuple=False, transform=self.transform if transform is None else transform
)
def register(self):
def search_function(s):
"""Determine if a file is of spec encoding and return special CodecInfo if it is"""
if s != "spec":
return None
return self.codec
# Do the register
codecs.register(search_function)
def get_codec(self):
"""Register spec codec"""
# Assume utf8 encoding
utf8 = encodings.search_function("utf8")
class StreamReader(utf_8.StreamReader):
"""Used by cPython to deal with a spec file"""
def __init__(sr, stream, *args, **kwargs):
codecs.StreamReader.__init__(sr, stream, *args, **kwargs)
if self.transform:
data = self.dealwith(sr.stream.readline)
sr.stream = StringIO(data)
def _decode(text, *args, transform=None, **kwargs):
transform = self.transform if transform is None else transform
if not transform:
return utf8.decode(text, *args, **kwargs)
if hasattr(text, "tobytes"):
text = text.tobytes().decode()
else:
text = text.decode()
reader = StringIO(text)
# Determine if we need to have imports for this string
# It may be a fragment of the file
has_spec = regexes["encoding_matcher"].search(reader.readline())
no_imports = not has_spec
reader.seek(0)
data = self.dealwith(reader.readline, no_imports=no_imports)
# If nothing was changed, then we want to use the original file/line
# Also have to replace indentation of original line with indentation of new line
# To take into account nested describes
if text and not regexes["only_whitespace"].match(text):
if regexes["whitespace"].sub("", text) == regexes["whitespace"].sub("", data):
bad_indentation = regexes["leading_whitespace"].search(text).groups()[0]
good_indentation = regexes["leading_whitespace"].search(data).groups()[0]
data = f"{good_indentation}{text[len(bad_indentation) :]}"
# If text is empty and data isn't, then we should return text
if len(text) == 0 and len(data) == 1:
return "", 0
# Return translated version and it's length
return data, len(data)
def decode(text, *args, return_tuple=True, transform=None, **kwargs):
ret = _decode(text, *args, transform=transform, **kwargs)
if return_tuple:
return ret
else:
return ret[0]
class incrementaldecoder(utf8.incrementaldecoder):
def decode(s, obj, final, **kwargs):
if not self.transform:
return super().decode(obj, final, **kwargs)
lines = obj.split("\n".encode("utf-8"))
if re.match(r"#\s*coding:\s*spec", lines[0].decode("utf-8", "replace")) and final:
kwargs["return_tuple"] = False
return decode(obj, final, **kwargs)
else:
return super().decode(obj, final, **kwargs)
return codecs.CodecInfo(
name="spec",
encode=utf8.encode,
decode=decode,
streamreader=StreamReader,
streamwriter=utf8.streamwriter,
incrementalencoder=utf8.incrementalencoder,
incrementaldecoder=incrementaldecoder,
)
def dealwith(self, readline, **kwargs):
"""
Replace the contents of spec file with the translated version
readline should be a callable object,
which provides the same interface as the readline() method of built-in file objects
"""
data = []
try:
# We pass in the data variable as an argument so that we
# get partial output even in the case of an exception.
self.tokeniser.translate(readline, data, **kwargs)
except:
lines = ['msg = r"""']
for line in traceback.format_exception(*sys.exc_info()):
lines.append(line.strip())
lines.append('"""')
lines.append(r'raise Exception(f"--- internal spec codec error --- \n{msg}")')
data = "\n".join(lines)
else:
# At this point, data is a list of tokens
data = untokenize(data)
# python3.9 requires a newline at the end
data += "\n"
return data
def output_for_debugging(self, stream, data):
"""It will write the translated version of the file"""
with open(f"{stream.name}.spec.out", "w") as f:
f.write(str(data))
########################
### CODEC REGISTER
########################
_spec_codec = None
def codec():
"""Return the codec used to translate a file"""
global _spec_codec
if _spec_codec is None:
_spec_codec = TokeniserCodec(Tokeniser())
return _spec_codec
def register(transform=True):
"""Get a codec and register it in python"""
do_register = False
try:
codecs.lookup("spec")
except LookupError:
do_register = True
cdc = codec()
cdc.transform = transform
if do_register:
cdc.register()
|