1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
|
#########################################################################
# MacSyFinder - Detection of macromolecular systems in protein dataset #
# using systems modelling and similarity search. #
# Authors: Sophie Abby, Bertrand Neron #
# Copyright (c) 2014-2024 Institut Pasteur (Paris) and CNRS. #
# See the COPYRIGHT file for details #
# #
# This file is part of MacSyFinder package. #
# #
# MacSyFinder is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# MacSyFinder is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details . #
# #
# You should have received a copy of the GNU General Public License #
# along with MacSyFinder (COPYING). #
# If not, see <https://www.gnu.org/licenses/>. #
#########################################################################
import sys
import os.path
import argparse
import typing
from itertools import groupby
import colorlog
from collections import OrderedDict
import macsypy
from macsypy.config import MacsyDefaults
def copy_chunk(fh_in: typing.IO, out: str, start: int, stop: int) -> None:
"""
Copy file from fh_in to out from position start to stop
:param fh_in: the source file
:type fh_in: file like object
:param out: the destination file name
:param start: the position to start the copy
:param stop: the position to end the copy
"""
chunk_size = 1024
fh_in.seek(start)
with open(out, 'w') as f_out:
for start in range(start, stop + 1, chunk_size):
read_size = min((stop - start), chunk_size)
content = fh_in.read(read_size)
f_out.write(content)
def split(seq_index: dict[str: tuple[int, int]], genome_path: str, outdir: str = '.') -> list[str]:
"""
split a file with different replicons in gembase format
in several files with one replicon per file
:seq_index: the sequences index
:type seq_index: dict [str seq_id] : (int start, int stop)
:param str genome_path: the path to the file to split
:param str outdir: the path of the directory where to write the replicons
:return: the list of created replicons files
:rtype: list of string
"""
def grp_replicon(line: str) -> str:
"""
in gembase the identifier of fasta sequence follows the following schema:
<replicon-name>_<seq-name> with eventually '_' inside the <replicon_name>
but not in the <seq-name>.
so grp_replicon allow to group sequences belonging to the same replicon.
"""
return "_".join(line.split('_')[: -1])
all_seq_files = []
with open(genome_path, 'r') as fh_in:
for rep_name, seq_ids in groupby(seq_index.keys(), key=grp_replicon):
seqs_ids = [id_ for id_ in seq_ids]
seq_file = os.path.normpath(os.path.join(outdir, f"{rep_name}.fasta"))
start = seq_index[seqs_ids[0]][0]
stop = seq_index[seqs_ids[-1]][1]
_log.info(f"Writing replicon {seq_file}")
copy_chunk(fh_in, seq_file, start, stop)
all_seq_files.append(seq_file)
return all_seq_files
def index_seq(genome_path: str) -> dict[str: tuple[int, int]]:
"""
Index the sequence in the file represented by genome_path
:param str genome_path: the path to a file containing several sequences in fasta format
:return: the sequences index
:rtype: dict [str seq_id] : (int start, int stop)
"""
index = OrderedDict()
with open(genome_path, 'r') as fh:
start = None
_id = None
line = fh.readline()
while line:
if line.startswith('>') and start is not None:
end = fh.tell() - len(line)
index[_id] = start, end
start = end
_id = line.split()[0][1:]
elif line.startswith('>'): # and start is None
# The first sequence
start = fh.tell() - len(line)
_id = line.split()[0][1:]
line = fh.readline()
# this is the end of file
# add the last sequence
end = fh.tell()
index[_id] = start, end
return index
def parse_args(args: list[str]) -> argparse.Namespace:
"""
:param args: The arguments passed on the command line (without the name of the program)
Typically sys.argv[1:]
:type args: list of string.
:return: the arguments parsed.
:rtype: a :class:`argparse.Namespace` object.
"""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="Split a gembase protein file in several files, one per replicon.")
parser.add_argument('genome_path',
help='Path to the genomes file (in gembase format), eg : path/to/file.fst or file.fst')
parser.add_argument('-o', '--outdir',
default='.',
help='The path to the directory where to write the chunks.')
parser.add_argument("--mute",
action='store_true',
default=False,
help="mute the log on stdout."
"(continue to log on macsy_gembase_split.out)")
verbosity_grp = parser.add_argument_group()
verbosity_grp.add_argument('-v', '--verbose',
action='count',
default=0,
help='Increase verbosity of output (can be cumulative : -vv)')
verbosity_grp.add_argument('-q', '--quiet',
action='count',
default=0,
help='Decrease verbosity of output (can be cumulative : -qq)'
)
parsed_args = parser.parse_args(args)
return parsed_args
def main(args: list[str] | None = None, log_level: int | str | None = None):
"""
main entry point to macsy_gembase_split
1. index the gembase file to identify start/end of each replicon
2. use this information to split gembase in several files one per replicon
:param args: the arguments passed on the command line
:type args: list of str
:param log_level: the output verbosity
:type log_level: a positive int or a string among 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
"""
global _log
args = sys.argv[1:] if args is None else args
parsed_args = parse_args(args)
outdir_err = None
if not os.path.exists(parsed_args.outdir):
try:
os.mkdir(parsed_args.outdir)
except PermissionError as err:
outdir_err = f"Cannot create {parsed_args.outdir} : {err}"
elif not os.path.isdir(parsed_args.outdir):
outdir_err = f"{parsed_args.outdir} is not a directory"
elif not os.access(parsed_args.outdir, os.W_OK):
outdir_err = f"{parsed_args.outdir} is not writable"
if outdir_err:
log = colorlog.getLogger('macsypy.split')
stdout_handler = colorlog.StreamHandler(sys.stdout)
stdout_formatter = colorlog.ColoredFormatter("%(log_color)s%(message)s",
datefmt=None,
reset=True,
log_colors={'CRITICAL': 'bold_red'},
secondary_log_colors={},
style='%'
)
stdout_handler.setFormatter(stdout_formatter)
log.addHandler(stdout_handler)
log.critical(outdir_err)
sys.tracebacklimit = 0
raise IOError() from None
macsypy.init_logger(log_file=os.path.join(parsed_args.outdir, 'macsy_gembase_split.out'),
out=not parsed_args.mute)
_log = colorlog.getLogger('macsypy.split')
if not log_level:
# logs are specify from args options
config = MacsyDefaults()
log_level = max(config.log_level - (10 * parsed_args.verbose) + (10 * parsed_args.quiet), 1)
macsypy.logger_set_level(log_level)
else:
# used by unit tests to mute or unmute logs
macsypy.logger_set_level(log_level)
idx = index_seq(parsed_args.genome_path)
replicon_names = split(idx, parsed_args.genome_path, outdir=parsed_args.outdir)
print(' '.join(replicon_names))
if __name__ == '__main__':
main()
|