File: test_utils.py

package info (click to toggle)
macsyfinder 2.1.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 134,860 kB
  • sloc: python: 20,583; xml: 953; sh: 37; makefile: 16
file content (137 lines) | stat: -rw-r--r-- 6,095 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#########################################################################
# MacSyFinder - Detection of macromolecular systems in protein dataset  #
#               using systems modelling and similarity search.          #
# Authors: Sophie Abby, Bertrand Neron                                  #
# Copyright (c) 2014-2024  Institut Pasteur (Paris) and CNRS.           #
# See the COPYRIGHT file for details                                    #
#                                                                       #
# This file is part of MacSyFinder package.                             #
#                                                                       #
# MacSyFinder is free software: you can redistribute it and/or modify   #
# it under the terms of the GNU General Public License as published by  #
# the Free Software Foundation, either version 3 of the License, or     #
# (at your option) any later version.                                   #
#                                                                       #
# MacSyFinder is distributed in the hope that it will be useful,        #
# but WITHOUT ANY WARRANTY; without even the implied warranty of        #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the          #
# GNU General Public License for more details .                         #
#                                                                       #
# You should have received a copy of the GNU General Public License     #
# along with MacSyFinder (COPYING).                                     #
# If not, see <https://www.gnu.org/licenses/>.                          #
#########################################################################

import os
import shutil
import tempfile
import argparse

from macsypy.registries import ModelRegistry, scan_models_dir
from macsypy.utils import get_def_to_detect, get_replicon_names, threads_available, parse_time
from macsypy.error import MacsypyError

from tests import MacsyTest


class TestUtils(MacsyTest):

    def setUp(self):
        self.tmp_dir = tempfile.mkdtemp()


    def tearDown(self):
        try:
            shutil.rmtree(self.tmp_dir)
        except Exception:
            pass


    def test_get_def_to_detect(self):
        cmd_args = argparse.Namespace()
        cmd_args.models_dir = os.path.join(self._data_dir, 'fake_model_dir')
        cmd_args.models = ('set_1', 'def_1_1', 'def_1_2', 'def_1_3')
        registry = ModelRegistry()
        models_location = scan_models_dir(cmd_args.models_dir)
        for ml in models_location:
            registry.add(ml)

        # case where models are specified on command line
        res, model_family, model_vers = get_def_to_detect(('set_1', ['def_1_1', 'def_1_2', 'def_1_3']), registry)
        model_loc = registry['set_1']
        self.assertEqual(model_family, 'set_1')
        self.assertEqual(model_vers, '0.0b2')
        exp = [model_loc.get_definition(name) for name in ('set_1/def_1_1', 'set_1/def_1_2', 'set_1/def_1_3')]
        self.assertListEqual(res, exp)

        # case we search all models
        res, model_family, model_vers = get_def_to_detect(('set_1', ['all']), registry)
        self.assertEqual(model_family, 'set_1')
        self.assertEqual(model_vers, '0.0b2')
        exp = model_loc.get_all_definitions()
        self.assertListEqual(res, exp)

        # case the models required does not exists
        with self.assertRaises(ValueError):
            get_def_to_detect(('set_1', ['FOO', 'BAR']), registry)


    def test_get_replicon_names_gembase(self):
        replicon_names = get_replicon_names(self.find_data('base', 'gembase.fasta'), 'gembase')
        self.assertListEqual(replicon_names,
                             ['GCF_000005845', 'GCF_000006725', 'GCF_000006745', 'GCF_000006765', 'GCF_000006845',
                              'GCF_000006905', 'GCF_000006925', 'GCF_000006945'])

    def test_get_replicon_names_ordered(self):
        replicon_names = get_replicon_names(self.find_data('base', 'MOBP1_once.prt'), 'ordered_replicon')
        self.assertListEqual(replicon_names,
                             ['MOBP1_once'])

    def test_get_replicon_names_unordered(self):
        replicon_names = get_replicon_names(self.find_data('base', 'MOBP1_once.prt'), 'unordered')
        self.assertListEqual(replicon_names,
                             ['MOBP1_once'])

    def test_get_replicon_names_bad_type(self):
        with self.assertRaises(MacsypyError) as ctx:
            get_replicon_names(self.find_data('base', 'MOBP1_once.prt'), 'bad_dbtype')
        self.assertEqual(str(ctx.exception),
                         'Invalid genome type: bad_dbtype')

    def test_threads_available(self):
        if hasattr(os, "sched_getaffinity"):
            sched_getaffinity_ori = os.sched_getaffinity
        else:
            sched_getaffinity_ori = None
        cpu_count_ori = os.cpu_count

        threads_nb = 7
        cpu_nb = 8

        os.cpu_count = lambda : cpu_nb

        try:
            del os.sched_getaffinity
            self.assertEqual(threads_available(), cpu_nb)
            os.sched_getaffinity = lambda x: [None] * threads_nb
            self.assertEqual(threads_available(), threads_nb)
        finally:
            os.cpu_count = cpu_count_ori
            if sched_getaffinity_ori:
                os.sched_getaffinity = sched_getaffinity_ori
            else:
                del os.sched_getaffinity

    def test_parse_time(self):
        self.assertEqual(parse_time(10), 10)
        self.assertEqual(parse_time('10s'), 10)
        self.assertEqual(parse_time('10m'), 600)
        self.assertEqual(parse_time('1h'), 3600)
        self.assertEqual(parse_time('1d'), 86400)
        self.assertEqual(parse_time(10.5), 10)
        self.assertEqual(parse_time('10m10s1h'), 600 + 10 + 3600)
        self.assertEqual(parse_time('10m 10s 1h'), 600 + 10 + 3600)
        with self.assertRaises(ValueError) as ctx:
            parse_time('10W')
        self.assertEqual(str(ctx.exception),
                         'Not valid time format. Units allowed h/m/s.')