1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
#########################################################################
# MacSyLib - Python library to detect macromolecular systems #
# in prokaryotes protein dataset using systems modelling #
# and similarity search. #
# #
# Authors: Sophie Abby, Bertrand Neron #
# Copyright (c) 2014-2025 Institut Pasteur (Paris) and CNRS. #
# See the COPYRIGHT file for details #
# #
# This file is part of MacSyLib package. #
# #
# MacSyLib is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# MacSyLib is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details . #
# #
# You should have received a copy of the GNU General Public License #
# along with MacSyLib (COPYING). #
# If not, see <https://www.gnu.org/licenses/>. #
#########################################################################
import logging
import colorlog
import os
import argparse
import unittest
import platform
import shutil
import tempfile
from macsylib.registries import ModelRegistry, scan_models_dir
from macsylib.utils import get_def_to_detect, get_replicon_names, threads_available, parse_time, list_models
from macsylib.error import MacsylibError
from tests import MacsyTest
class TestUtils(MacsyTest):
@unittest.skipIf(platform.system() == 'Windows' or os.getuid() == 0, 'Skip test on Windows or if run as root')
def test_list_models_no_permission(self):
# on gitlab it is not allowed to change the permission of a directory
# located in tests/data
# So I need to copy it in /tmp
tmp_dir= tempfile.TemporaryDirectory(prefix='test_msl_Config_')
model_dir_name = 'fake_model_dir'
src_model_dir = self.find_data(model_dir_name)
dst_model_dir = os.path.join(tmp_dir.name, 'fake_model_dir')
shutil.copytree(src_model_dir, dst_model_dir)
log = colorlog.getLogger('macsylib')
log.setLevel(logging.WARNING)
cmd_args = argparse.Namespace()
cmd_args.models_dir = dst_model_dir
cmd_args.list_models = True
models_dir_perm = os.stat(cmd_args.models_dir).st_mode
try:
os.chmod(cmd_args.models_dir, 0o110)
with self.catch_log(log_name='macsylib') as log:
rcv_list_models = list_models(cmd_args)
log_msg = log.get_value().strip()
self.assertEqual(rcv_list_models, '')
self.assertEqual(log_msg, f"{cmd_args.models_dir} is not readable: [Errno 13] Permission denied: '{cmd_args.models_dir}' : skip it.")
finally:
os.chmod(cmd_args.models_dir, models_dir_perm)
tmp_dir.cleanup()
def test_get_replicon_names_gembase(self):
replicon_names = get_replicon_names(self.find_data('base', 'gembase.fasta'), 'gembase')
self.assertListEqual(replicon_names,
['GCF_000005845', 'GCF_000006725', 'GCF_000006745', 'GCF_000006765', 'GCF_000006845',
'GCF_000006905', 'GCF_000006925', 'GCF_000006945'])
def test_get_replicon_names_ordered(self):
replicon_names = get_replicon_names(self.find_data('base', 'MOBP1_once.prt'), 'ordered_replicon')
self.assertListEqual(replicon_names,
['MOBP1_once'])
def test_get_replicon_names_unordered(self):
replicon_names = get_replicon_names(self.find_data('base', 'MOBP1_once.prt'), 'unordered')
self.assertListEqual(replicon_names,
['MOBP1_once'])
def test_get_replicon_names_bad_type(self):
with self.assertRaises(MacsylibError) as ctx:
get_replicon_names(self.find_data('base', 'MOBP1_once.prt'), 'bad_dbtype')
self.assertEqual(str(ctx.exception),
'Invalid genome type: bad_dbtype')
def test_threads_available(self):
if hasattr(os, "sched_getaffinity"):
sched_getaffinity_ori = os.sched_getaffinity
else:
sched_getaffinity_ori = None
cpu_count_ori = os.cpu_count
threads_nb = 7
cpu_nb = 8
os.cpu_count = lambda : cpu_nb
try:
del os.sched_getaffinity
self.assertEqual(threads_available(), cpu_nb)
os.sched_getaffinity = lambda x: [None] * threads_nb
self.assertEqual(threads_available(), threads_nb)
finally:
os.cpu_count = cpu_count_ori
if sched_getaffinity_ori:
os.sched_getaffinity = sched_getaffinity_ori
else:
del os.sched_getaffinity
def test_parse_time(self):
self.assertEqual(parse_time(10), 10)
self.assertEqual(parse_time('10s'), 10)
self.assertEqual(parse_time('10m'), 600)
self.assertEqual(parse_time('1h'), 3600)
self.assertEqual(parse_time('1d'), 86400)
self.assertEqual(parse_time(10.5), 10)
self.assertEqual(parse_time('10m10s1h'), 600 + 10 + 3600)
self.assertEqual(parse_time('10m 10s 1h'), 600 + 10 + 3600)
with self.assertRaises(ValueError) as ctx:
parse_time('10W')
self.assertEqual(str(ctx.exception),
'Not valid time format. Units allowed h/m/s.')
|