File: utility.py

package info (click to toggle)
inkscape-textext 1.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 10,836 kB
  • sloc: python: 3,471; sh: 26; makefile: 25
file content (307 lines) | stat: -rw-r--r-- 9,844 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
"""
This file is part of TexText, an extension for the vector
illustration program Inkscape.

Copyright (c) 2006-2025 TexText developers.

TexText is released under the 3-Clause BSD license. See
file LICENSE.txt or go to https://github.com/textext/textext
for full license details.

Provides handlers for temp-dir management, logging, settings and
system command execution
"""
import contextlib
import json
import logging.handlers
import os
import platform
import shutil
import stat
import subprocess
import tempfile
import re

from .errors import *
import sys


class ChangeDirectory(object):
    def __init__(self, dir):
        self.new_dir = dir
        self.old_dir = os.path.abspath(os.path.curdir)

    def __enter__(self):
        os.chdir(self.new_dir)

    def __exit__(self, exc_type, exc_val, exc_tb):
        os.chdir(self.old_dir)


class TemporaryDirectory(object):
    """ Mimic tempfile.TemporaryDirectory from python3 """
    def __init__(self):
        self.dir_name = None

    def __enter__(self):
        self.dir_name = tempfile.mkdtemp("textext_")
        return self.dir_name

    def __exit__(self, exc_type, exc_val, exc_tb):

        def retry_with_chmod(func, path, exec_info):
            os.chmod(path, stat.S_IWRITE)
            func(path)

        if self.dir_name:
            shutil.rmtree(self.dir_name, onerror=retry_with_chmod)


@contextlib.contextmanager
def ChangeToTemporaryDirectory():
    with TemporaryDirectory() as temp_dir:
        with ChangeDirectory(temp_dir):
            yield None


class MyLogger(logging.Logger):
    """
        Needs to produce correct line numbers
    """
    def findCaller(self, *args):
        n_frames_upper = 2
        f = logging.currentframe()
        for _ in range(2 + n_frames_upper):  # <-- correct frame
            if f is not None:
                f = f.f_back
        rv = "(unknown file)", 0, "(unknown function)", None
        while hasattr(f, "f_code"):
            co = f.f_code
            filename = os.path.normcase(co.co_filename)
            if filename == logging._srcfile:
                f = f.f_back
                continue
            rv = (co.co_filename, f.f_lineno, co.co_name, None)
            break
        if sys.version_info[0] == 2:  # ToDo: Remove when Python 2 support is deprecated
            rv = rv[0:3]
        return rv


class NestedLoggingGuard(object):
    message_offset = 0
    message_indent = 2

    def __init__(self, _logger, lvl=None, message=None):
        self._logger = _logger
        self._level = lvl
        self._message = message
        if lvl is not None and message is not None:
            self._logger.log(self._level, " " * NestedLoggingGuard.message_offset + self._message)

    def __enter__(self):
        assert self._level is not None
        assert self._message is not None
        NestedLoggingGuard.message_offset += NestedLoggingGuard.message_indent

    def __exit__(self, exc_type, exc_val, exc_tb):
        assert self._level is not None
        assert self._message is not None
        if exc_type is None:
            result = "done"
        else:
            result = "failed"
        NestedLoggingGuard.message_offset -= NestedLoggingGuard.message_indent

        def tmp1():  # this nesting needed to even number of stack frames in __enter__ and __exit__
            def tmp2():
                self._logger.log(self._level, " " * NestedLoggingGuard.message_offset + self._message.strip() + " " + result)
            tmp2()
        tmp1()

    def debug(self, message):
        return self.log(logging.DEBUG, message)

    def info(self, message):
        return self.log(logging.INFO, message)

    def error(self, message):
        return self.log(logging.ERROR, message)

    def warning(self, message):
        return self.log(logging.WARNING, message)

    def critical(self, message):
        return self.log(logging.CRITICAL, message)

    def log(self, lvl, message):
        return NestedLoggingGuard(self._logger, lvl, message)


class CycleBufferHandler(logging.handlers.BufferingHandler):

    def __init__(self, capacity):
        super(CycleBufferHandler, self).__init__(capacity)

    def emit(self, record):
        self.buffer.append(record)
        if len(self.buffer) > self.capacity:
            self.buffer = self.buffer[-self.capacity:]

    def show_messages(self):
        import sys
        version_is_good = (2, 7) <= sys.version_info < (3, 0)
        if version_is_good:
            import inkex
            """show messages to user and empty buffer"""
            inkex.errormsg("\n".join([self.format(record) for record in self.buffer]))
        else:
            sys.stderr.write("\n".join([self.format(record) for record in self.buffer]))
        self.flush()


class Settings(object):
    def __init__(self, basename="config.json", directory=None):
        if directory is None:
            directory = os.getcwd()
        else:
            if not os.path.exists(directory):
                os.makedirs(directory, exist_ok=True)
        self.values = {}
        self.directory = directory
        self.config_path = os.path.join(directory, basename)
        try:
            self.load()
        except ValueError as e:
            raise TexTextFatalError("Bad config `%s`: %s. Please fix it and re-run TexText." % (self.config_path, str(e)) )

    def load(self):
        if os.path.isfile(self.config_path):
            with open(self.config_path) as f:
                self.values = json.load(f)

    def save(self):
        with open(self.config_path, "w") as f:
            json.dump(self.values, f, indent=2)

    def get(self, key, default=None):
        result = self.values.get(key, default)
        if result is None:
            return default
        return result

    def delete_file(self):
        if os.path.exists(self.config_path):
            try:
                os.remove(self.config_path)
            except OSError as err:
                TexTextFatalError("Config `%s` could not be deleted. Error message: %s" % (
                                  self.config_path, str(err)))

    def __getitem__(self, key):
        return self.values.get(key)

    def __setitem__(self, key, value):
        if value is not None:
            self.values[key] = value
        else:
            self.values.pop(key, None)


class Cache(Settings):
    def __init__(self, basename=".cache.json", directory=None):
        try:
            super(Cache, self).__init__(basename, directory)
        except TexTextFatalError:
            pass


class SuppressStream(object):
    """
    "Suppress stream output" context manager

    Effectively redirects output to /dev/null by switching fileno
    """

    def __init__(self, stream=sys.stderr):
        self.orig_stream_fileno = stream.fileno()

    def __enter__(self):
        self.orig_stream_dup = os.dup(self.orig_stream_fileno)
        self.devnull = open(os.devnull, 'w')
        os.dup2(self.devnull.fileno(), self.orig_stream_fileno)

    def __exit__(self, type, value, traceback):
        os.close(self.orig_stream_fileno)
        os.dup2(self.orig_stream_dup, self.orig_stream_fileno)
        os.close(self.orig_stream_dup)
        self.devnull.close()


def exec_command(cmd, ok_return_value=0):
    """
    Run given command, check return value, and return
    concatenated stdout and stderr.
    :param cmd: Command to execute
    :param ok_return_value: The expected return value after successful completion
    :raises: TexTextCommandNotFound, TexTextCommandFailed
    """

    try:
        # hides the command window for cli tools that are run (in Windows)
        info = None
        if PLATFORM == WINDOWS:
            info = subprocess.STARTUPINFO()
            info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            info.wShowWindow = subprocess.SW_HIDE

        p = subprocess.Popen(cmd,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE,
                             stdin=subprocess.PIPE,
                             startupinfo=info)
        out, err = p.communicate()
    except OSError as err:
        raise TexTextCommandNotFound("Command %s failed: %s" % (' '.join(cmd), err))

    if ok_return_value is not None and p.returncode != ok_return_value:
        raise TexTextCommandFailed(message="Command %s failed (code %d)" % (' '.join(cmd), p.returncode),
                                   return_code=p.returncode,
                                   stdout=out,
                                   stderr=err)
    return out + err


def version_greater_or_equal_than(version_str, other_version_str):
    """ Checks if a version number is >= than another version number

    Version numbers are passed as strings and must be of type "N.M.Rarb" where N, M, R
    are non negative decimal numbers < 1000 and arb is an arbitrary string.
    For example, "1.2.3" or "1.2.3dev" or "1.2.3-dev" or "1.2.3 dev" are valid version strings.

    Returns:
        True if the version number is equal or greater then the other version number,
        otherwise false

    """
    def ver_str_to_float(ver_str):
        """ Parse version string and returns it as a floating point value

        Returns The version string as floating point number for easy comparison
        (minor version and relase number padded with zeros). E.g. "1.23.4dev" -> 1.023004.
        If conversion fails returns NaN.

        """
        m = re.search(r"(\d+).(\d+).(\d+)[-\w]*", ver_str)
        if m is not None:
            ver_maj, ver_min, ver_rel = m.groups()
            return float("{}.{:0>3}{:0>3}".format(ver_maj, ver_min, ver_rel))
        else:
            return float("nan")

    return ver_str_to_float(version_str) >= ver_str_to_float(other_version_str)


MAC = "Darwin"
WINDOWS = "Windows"
PLATFORM = platform.system()