File: __init__.py

package info (click to toggle)
macsylib 1.0.4%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 30,120 kB
  • sloc: python: 10,279; xml: 92; sh: 22; makefile: 12
file content (330 lines) | stat: -rw-r--r-- 12,738 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
#########################################################################
# MacSyLib - Python library to detect macromolecular systems            #
#            in prokaryotes protein dataset using systems modelling     #
#            and similarity search.                                     #
#                                                                       #
# Authors: Sophie Abby, Bertrand Neron                                  #
# Copyright (c) 2014-2025  Institut Pasteur (Paris) and CNRS.           #
# See the COPYRIGHT file for details                                    #
#                                                                       #
# This file is part of MacSyLib package.                                #
#                                                                       #
# MacSyLib is free software: you can redistribute it and/or modify      #
# it under the terms of the GNU General Public License as published by  #
# the Free Software Foundation, either version 3 of the License, or     #
# (at your option) any later version.                                   #
#                                                                       #
# MacSyLib is distributed in the hope that it will be useful,           #
# but WITHOUT ANY WARRANTY; without even the implied warranty of        #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the          #
# GNU General Public License for more details .                         #
#                                                                       #
# You should have received a copy of the GNU General Public License     #
# along with MacSyLib (COPYING).                                        #
# If not, see <https://www.gnu.org/licenses/>.                          #
#########################################################################

import os
import sys
import shutil
import unittest
from io import StringIO
from contextlib import contextmanager
import hashlib
from functools import partial
import tempfile
import uuid
import colorlog
import json
import re

import macsylib
import macsylib.config


def path_to_modulename(p):
    """
    Example: foo/bar.py become bar
    """
    filename = os.path.basename(p)
    modulename = os.path.splitext(filename)[0]
    return modulename


class MacsyTest(unittest.TestCase):

    _tests_dir = os.path.normpath(os.path.dirname(__file__))
    _data_dir = os.path.join(_tests_dir, "data")

    def __init__(self, *args, **kwargs):
        macsylib.__MACSY_DATA__ = self._tests_dir
        macsylib.config.__MACSY_DATA__ = self._tests_dir
        super().__init__(*args, **kwargs)

    @staticmethod
    def setsid():
        platform = sys.platform
        if platform.startswith('linux'):
            setsid = 'setsid'
        elif platform.startswith('darwin'):
            setsid = os.path.normpath(os.path.join(os.path.dirname(__file__), '..', 'utils', 'setsid'))
        else:
            setsid = ''
        return setsid

    @classmethod
    def find_data(cls, *args):
        data_path = os.path.join(cls._data_dir, *args)
        if os.path.exists(data_path):
            return data_path
        else:
            raise IOError("data '{}' does not exists".format(data_path))


    @contextmanager
    def catch_io(self, out=False, err=False):
        """
        Catch stderr and stdout of the code running within this block.
        """
        old_out = sys.stdout
        new_out = old_out
        old_err = sys.stderr
        new_err = old_err
        if out:
            new_out = StringIO()
        if err:
            new_err = StringIO()
        try:
            sys.stdout, sys.stderr = new_out, new_err
            yield sys.stdout, sys.stderr
        finally:
            sys.stdout, sys.stderr = old_out, old_err
            if out:
                new_out.close()
            if err:
                new_err.close()


    @staticmethod
    def fake_exit(*args, **kwargs):
        returncode = args[0]
        raise TypeError(returncode)

    @staticmethod
    def mute_call(call_ori):
        """
        hmmsearch or prodigal write lot of things on stderr or stdout
        which noise the unit test output
        So I replace the `call` function in module integron_finder
        by a wrapper which call the original function but add redirect stderr and stdout
        in dev_null
        :return: wrapper around call function
        :rtype: function
        """
        def wrapper(*args, **kwargs):
            with open(os.devnull, 'w') as f:
                kwargs['stderr'] = f
                kwargs['stdout'] = f
                res = call_ori(*args, **kwargs)
            return res
        return wrapper

    @staticmethod
    def remove_red_ansi_color(colored_msg):
        red_pattern = r"^\x1b\[0?1;31m(.*)\x1b\[0m$"
        msg = re.match(red_pattern, colored_msg).groups()[0]
        return msg


    def assertFileEqual(self, f1, f2, comment=None, skip_line=None, msg=None):
        self.maxDiff = None
        # the StringIO does not support context in python2.7
        # so we can use the following statement only in python3
        from itertools import zip_longest
        with open(f1) if isinstance(f1, str) else f1 as fh1, open(f2) if isinstance(f2, str) else f2 as fh2:
            for l1, l2 in zip_longest(fh1, fh2):
                if l1 and l2:
                    if comment and l1.startswith(comment) and l2.startswith(comment):
                        continue
                    elif skip_line:
                        if re.search(skip_line, l1) and re.search(skip_line, l2):
                            continue
                        try:
                            self.assertEqual(l1, l2, msg)
                        except AssertionError as err:
                            raise AssertionError(f"{fh1.name} and {fh2.name} differ:\n {err}")
                    try:
                        self.assertEqual(l1, l2, msg)
                    except AssertionError as err:
                        raise AssertionError(f"{fh1.name} and {fh2.name} differ:\n {err}")
                elif l1:  # and not l2
                    raise self.failureException(f"{fh1.name} is longer than {fh2.name}")
                elif l2:  # and not l1
                    raise self.failureException(f"{fh2.name} is longer than {fh1.name}")


    def assertTsvEqual(self, f1, f2, tsv_type='best_solution.tsv', comment="#", msg=None):
        # the StringIO does not support context in python2.7
        # so we can use the following statement only in python3
        from itertools import zip_longest
        if isinstance(f1, StringIO):
            f1.name = f"{f1.__class__} f1"
        if isinstance(f2, StringIO):
            f2.name = f"{f2.__class__} f2"
        with open(f1) if isinstance(f1, str) else f1 as fh1, open(f2) if isinstance(f2, str) else f2 as fh2:
            header = None
            for i, grp in enumerate(zip_longest(fh1, fh2), 1):

                l1, l2 = grp
                if l1.startswith(comment) and l2.startswith(comment):
                    continue
                fields_1 = l1.split()
                fields_2 = l2.split()
                if not fields_1 and not fields_2:
                    # skip empty line
                    continue

                # the system_id may change from one run to another
                # So I have to remove them before to compare each row
                if header is None:
                    header = fields_1[:]

                    if tsv_type in ('all_systems.tsv', 'best_solution.tsv', 'all_best_solutions.tsv'):
                        sys_id_idx = header.index('sys_id')
                        header.pop(sys_id_idx)
                    elif tsv_type in ('best_solution_loners.tsv', 'best_solution_multisystems.tsv', 'best_solution_summary.tsv'):
                        pass
                    elif tsv_type == 'rejected_candidates.tsv':
                        candidate_id_idx = header.index('candidate_id')
                        header.pop(candidate_id_idx)
                        cluster_id_idx = header.index('cluster_id')
                        header.pop(cluster_id_idx)
                    else:
                        raise RuntimeError(f"unknown '{tsv_type}' tsv type file in assertTsvEqual")
                else:
                    if tsv_type in ('all_systems.tsv', 'best_solution.tsv', 'all_best_solutions.tsv'):
                        fields_1.pop(sys_id_idx)
                        fields_2.pop(sys_id_idx)
                        if len(fields_1) == len(header):
                            # remove used_in field if present
                            fields_1.pop(-1)
                            fields_2.pop(-1)
                    elif tsv_type == 'rejected_candidates.tsv':
                        fields_1.pop(candidate_id_idx)
                        fields_1.pop(cluster_id_idx)
                        fields_2.pop(candidate_id_idx)
                        fields_2.pop(cluster_id_idx)

                # counterpart order does not matter
                fields_1[-1] = set(fields_1[-1].split(','))
                fields_2[-1] = set(fields_2[-1].split(','))
                self.assertEqual(fields_1, fields_2, f"{fh1.name} differ from {fh2.name} at line {i}:\n{l1}{l2}")


    def assertSeqRecordEqual(self, s1, s2):
        for attr in ('id', 'name', 'seq'):
            s1_attr = getattr(s1, attr)
            s2_attr = getattr(s2, attr)
            self.assertEqual(s1_attr, s2_attr, msg="{} are different: {} != {}".format(attr, s1_attr, s2_attr))

        # there is a bug in some biopython version
        self.assertEqual(s1.description.rstrip('.'), s2.description.rstrip('.'))
        for s1_feat, s2_feat in zip(s1.features, s2.features):
            # location cannot be directly compared
            self.assertEqual(str(s1_feat.location), str(s2_feat.location))

            for attr in ('qualifiers', 'strand', 'type'):
                f1_attr = getattr(s1_feat, attr)
                f2_attr = getattr(s2_feat, attr)
                self.assertEqual(f1_attr, f2_attr, msg="{} are different: {} != {}".format(attr, f1_attr, f2_attr))

    def assertHmmEqual(self, hmm1, hmm2):
        with open(hmm1) as hmm1_file, open(hmm2) as hmm2_file:
            for hmm1_line, hmm2_line in zip(hmm1_file, hmm2_file):
                if hmm1_line.startswith('#') and hmm2_line.startswith('#'):
                    continue
                hmm1_fields = hmm1_line.split('#')[:-1]
                hmm2_fields = hmm2_line.split('#')[:-1]
                self.assertListEqual(hmm1_fields, hmm2_fields)


    def assertJsonEqual(self, json_file_1, json_file_2, max_diff=640):
        with open(json_file_1) as f1:
            j1 = json.load(f1)
        with open(json_file_2) as f2:
            j2 = json.load(f2)

        self.maxDiff = max_diff
        self.assertListEqual(j1, j2)


    @staticmethod
    def get_tmp_dir_name():
        return os.path.join(tempfile.gettempdir(), "macsylib_test_run")

    @staticmethod
    def get_uniq_tmp_dir_name():
        return os.path.join(tempfile.gettempdir(), "macsylib-{}".format(uuid.uuid4()))

    @staticmethod
    def rmtree(path):
        """
        Remove directory tree.

        :param path: the path to remove
        :type path: str
        """
        try:
            shutil.rmtree(path)
        except Exception:
            pass

    @staticmethod
    def md5sum(file_=None, str_=None):
        """Compute md5 checksum.

        :param file_: the name of the file to compute the checksum for
        :type file_: str
        :param str_: the string to compute the checksum for
        :type str_: str
        """
        assert not (file_ and str_)

        d = hashlib.md5()

        if file_:
            with open(file_, mode='rb') as f:
                for buf in iter(partial(f.read, 128), b''):
                    d.update(buf)
        elif str_:
            assert isinstance(str_, str)
            d.update(str_)
        else:
            assert False
        return d.hexdigest()


    @contextmanager
    def catch_log(self, log_name='macsylib'):
        logger = colorlog.getLogger(log_name)
        handlers_ori = logger.handlers
        fake_handler = colorlog.StreamHandler(StringIO())
        try:
            logger.handlers = [fake_handler]
            yield LoggerWrapper(logger)
        finally:
            fake_handler.close()
            logger.handlers = handlers_ori



class LoggerWrapper(object):

    def __init__(self, logger):
        self.logger = logger

    def __getattr__(self, item):
        return getattr(self.logger, item)

    def get_value(self):
        return self.logger.handlers[0].stream.getvalue()