1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
|
"""
This file is part of TexText, an extension for the vector
illustration program Inkscape.
Copyright (c) 2006-2025 TexText developers.
TexText is released under the 3-Clause BSD license. See
file LICENSE.txt or go to https://github.com/textext/textext
for full license details.
Provides handlers for temp-dir management, logging, settings and
system command execution
"""
import contextlib
import json
import logging.handlers
import os
import platform
import shutil
import stat
import subprocess
import tempfile
import re
from .errors import *
import sys
class ChangeDirectory(object):
def __init__(self, dir):
self.new_dir = dir
self.old_dir = os.path.abspath(os.path.curdir)
def __enter__(self):
os.chdir(self.new_dir)
def __exit__(self, exc_type, exc_val, exc_tb):
os.chdir(self.old_dir)
class TemporaryDirectory(object):
""" Mimic tempfile.TemporaryDirectory from python3 """
def __init__(self):
self.dir_name = None
def __enter__(self):
self.dir_name = tempfile.mkdtemp("textext_")
return self.dir_name
def __exit__(self, exc_type, exc_val, exc_tb):
def retry_with_chmod(func, path, exec_info):
os.chmod(path, stat.S_IWRITE)
func(path)
if self.dir_name:
shutil.rmtree(self.dir_name, onerror=retry_with_chmod)
@contextlib.contextmanager
def ChangeToTemporaryDirectory():
with TemporaryDirectory() as temp_dir:
with ChangeDirectory(temp_dir):
yield None
class MyLogger(logging.Logger):
"""
Needs to produce correct line numbers
"""
def findCaller(self, *args):
n_frames_upper = 2
f = logging.currentframe()
for _ in range(2 + n_frames_upper): # <-- correct frame
if f is not None:
f = f.f_back
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
if filename == logging._srcfile:
f = f.f_back
continue
rv = (co.co_filename, f.f_lineno, co.co_name, None)
break
if sys.version_info[0] == 2: # ToDo: Remove when Python 2 support is deprecated
rv = rv[0:3]
return rv
class NestedLoggingGuard(object):
message_offset = 0
message_indent = 2
def __init__(self, _logger, lvl=None, message=None):
self._logger = _logger
self._level = lvl
self._message = message
if lvl is not None and message is not None:
self._logger.log(self._level, " " * NestedLoggingGuard.message_offset + self._message)
def __enter__(self):
assert self._level is not None
assert self._message is not None
NestedLoggingGuard.message_offset += NestedLoggingGuard.message_indent
def __exit__(self, exc_type, exc_val, exc_tb):
assert self._level is not None
assert self._message is not None
if exc_type is None:
result = "done"
else:
result = "failed"
NestedLoggingGuard.message_offset -= NestedLoggingGuard.message_indent
def tmp1(): # this nesting needed to even number of stack frames in __enter__ and __exit__
def tmp2():
self._logger.log(self._level, " " * NestedLoggingGuard.message_offset + self._message.strip() + " " + result)
tmp2()
tmp1()
def debug(self, message):
return self.log(logging.DEBUG, message)
def info(self, message):
return self.log(logging.INFO, message)
def error(self, message):
return self.log(logging.ERROR, message)
def warning(self, message):
return self.log(logging.WARNING, message)
def critical(self, message):
return self.log(logging.CRITICAL, message)
def log(self, lvl, message):
return NestedLoggingGuard(self._logger, lvl, message)
class CycleBufferHandler(logging.handlers.BufferingHandler):
def __init__(self, capacity):
super(CycleBufferHandler, self).__init__(capacity)
def emit(self, record):
self.buffer.append(record)
if len(self.buffer) > self.capacity:
self.buffer = self.buffer[-self.capacity:]
def show_messages(self):
import sys
version_is_good = (2, 7) <= sys.version_info < (3, 0)
if version_is_good:
import inkex
"""show messages to user and empty buffer"""
inkex.errormsg("\n".join([self.format(record) for record in self.buffer]))
else:
sys.stderr.write("\n".join([self.format(record) for record in self.buffer]))
self.flush()
class Settings(object):
def __init__(self, basename="config.json", directory=None):
if directory is None:
directory = os.getcwd()
else:
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
self.values = {}
self.directory = directory
self.config_path = os.path.join(directory, basename)
try:
self.load()
except ValueError as e:
raise TexTextFatalError("Bad config `%s`: %s. Please fix it and re-run TexText." % (self.config_path, str(e)) )
def load(self):
if os.path.isfile(self.config_path):
with open(self.config_path) as f:
self.values = json.load(f)
def save(self):
with open(self.config_path, "w") as f:
json.dump(self.values, f, indent=2)
def get(self, key, default=None):
result = self.values.get(key, default)
if result is None:
return default
return result
def delete_file(self):
if os.path.exists(self.config_path):
try:
os.remove(self.config_path)
except OSError as err:
TexTextFatalError("Config `%s` could not be deleted. Error message: %s" % (
self.config_path, str(err)))
def __getitem__(self, key):
return self.values.get(key)
def __setitem__(self, key, value):
if value is not None:
self.values[key] = value
else:
self.values.pop(key, None)
class Cache(Settings):
def __init__(self, basename=".cache.json", directory=None):
try:
super(Cache, self).__init__(basename, directory)
except TexTextFatalError:
pass
class SuppressStream(object):
"""
"Suppress stream output" context manager
Effectively redirects output to /dev/null by switching fileno
"""
def __init__(self, stream=sys.stderr):
self.orig_stream_fileno = stream.fileno()
def __enter__(self):
self.orig_stream_dup = os.dup(self.orig_stream_fileno)
self.devnull = open(os.devnull, 'w')
os.dup2(self.devnull.fileno(), self.orig_stream_fileno)
def __exit__(self, type, value, traceback):
os.close(self.orig_stream_fileno)
os.dup2(self.orig_stream_dup, self.orig_stream_fileno)
os.close(self.orig_stream_dup)
self.devnull.close()
def exec_command(cmd, ok_return_value=0):
"""
Run given command, check return value, and return
concatenated stdout and stderr.
:param cmd: Command to execute
:param ok_return_value: The expected return value after successful completion
:raises: TexTextCommandNotFound, TexTextCommandFailed
"""
try:
# hides the command window for cli tools that are run (in Windows)
info = None
if PLATFORM == WINDOWS:
info = subprocess.STARTUPINFO()
info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
info.wShowWindow = subprocess.SW_HIDE
p = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
startupinfo=info)
out, err = p.communicate()
except OSError as err:
raise TexTextCommandNotFound("Command %s failed: %s" % (' '.join(cmd), err))
if ok_return_value is not None and p.returncode != ok_return_value:
raise TexTextCommandFailed(message="Command %s failed (code %d)" % (' '.join(cmd), p.returncode),
return_code=p.returncode,
stdout=out,
stderr=err)
return out + err
def version_greater_or_equal_than(version_str, other_version_str):
""" Checks if a version number is >= than another version number
Version numbers are passed as strings and must be of type "N.M.Rarb" where N, M, R
are non negative decimal numbers < 1000 and arb is an arbitrary string.
For example, "1.2.3" or "1.2.3dev" or "1.2.3-dev" or "1.2.3 dev" are valid version strings.
Returns:
True if the version number is equal or greater then the other version number,
otherwise false
"""
def ver_str_to_float(ver_str):
""" Parse version string and returns it as a floating point value
Returns The version string as floating point number for easy comparison
(minor version and relase number padded with zeros). E.g. "1.23.4dev" -> 1.023004.
If conversion fails returns NaN.
"""
m = re.search(r"(\d+).(\d+).(\d+)[-\w]*", ver_str)
if m is not None:
ver_maj, ver_min, ver_rel = m.groups()
return float("{}.{:0>3}{:0>3}".format(ver_maj, ver_min, ver_rel))
else:
return float("nan")
return ver_str_to_float(version_str) >= ver_str_to_float(other_version_str)
MAC = "Darwin"
WINDOWS = "Windows"
PLATFORM = platform.system()
|