1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
|
#########################################################################
# MacSyFinder - Detection of macromolecular systems in protein dataset #
# using systems modelling and similarity search. #
# Authors: Sophie Abby, Bertrand Neron #
# Copyright (c) 2014-2024 Institut Pasteur (Paris) and CNRS. #
# See the COPYRIGHT file for details #
# #
# This file is part of MacSyFinder package. #
# #
# MacSyFinder is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# MacSyFinder is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details . #
# #
# You should have received a copy of the GNU General Public License #
# along with MacSyFinder (COPYING). #
# If not, see <https://www.gnu.org/licenses/>. #
#########################################################################
import itertools
import networkx as nx
from typing import Generator
from .system import System, RejectedCandidate
from .cluster import Cluster
from .hit import Loner, LonerMultiSystem
class Solution:
"""
Handle Solution, a solution is a set of compatible Systems
when compare solutions we check the following criteria
1. The number of hits
2. The number of systems
3. The average of wholeness
4. The hits position (is used ti give predictable output for unit tests)
"""
def __init__(self, systems: list[System]) -> None:
"""
:param systems: The list of system that composed this solution
"""
self._systems = self._sorted_systems(systems)
self._score = sum([syst.score for syst in self.systems])
self._average_woleness = sum([sys.wholeness for sys in self.systems]) / len(self.systems)
self._hits_number = sum([len(syst.hits) for syst in self.systems])
self._hits_positions = [h.position for syst in self.systems for h in syst.hits]
def _sorted_systems(self, systems: list[System]) -> list[System]:
"""
sort the systems following the positions of th hits that composed the systems
:param systems: the systems to sort
:type systems: list of :class:`mcsypy.system.System` objects
:return: a sorted copy of the *systems*
:rtype: list of :class:`mcsypy.system.System` objects
"""
sorted_sys = sorted(systems, key=lambda syst: ([h.position for h in syst.hits],
syst.model.fqn,
- syst.score)
)
return sorted_sys
@property
def systems(self) -> list[System]:
""""a sorted list of the *systems* that composed the solution"""
return self._systems[:]
@property
def score(self) -> float:
"""The score of this solution"""
return self._score
@property
def average_wholeness(self) -> float:
"""The average of the systems wholeness"""
return self._average_woleness
@property
def hits_number(self) -> int:
"""The sum of the hits of each system in this solution"""
return self._hits_number
@property
def hits_positions(self) -> list[int]:
"""The list of position of all hits of the solution"""
return self._hits_positions
def __len__(self) -> int:
return len(self.systems)
def __gt__(self, other) -> bool:
return (self.hits_number, len(self), self.average_wholeness, self.hits_positions) > \
(other.hits_number, len(other), other.average_wholeness, other.hits_positions)
def __lt__(self, other) -> bool:
return (self.hits_number, len(self), self.average_wholeness, self.hits_positions) < \
(other.hits_number, len(other), other.average_wholeness, other.hits_positions)
def __eq__(self, other) -> bool:
return self.hits_number == other.hits_number and \
len(self) == len(other) and \
self.average_wholeness == other.average_wholeness and \
self.hits_positions == other.hits_positions
def __iter__(self) -> Generator:
"""
Solution allow to iterate over the systems
:return: generator
"""
return (s for s in self.systems)
def find_best_solutions(systems: list[System]) -> tuple[list[Solution], float]:
"""
Among the systems choose the combination of systems which does not share :class:`macsypy.hit.CoreHit`
and maximize the sum of systems scores
:param systems: the systems to analyse
:return: the list of :class:`macsypy.solution.Solution` which represent one best solution then it's score.
:rtype: tuple of 2 elements the best solutions and it's score
"""
G = nx.Graph()
# add nodes (vertices)
G.add_nodes_from(systems)
# let's create an edges between compatible nodes
for sys_i, sys_j in itertools.combinations(systems, 2):
if sys_i.is_compatible(sys_j):
G.add_edge(sys_i, sys_j)
# find_cliques return a generator so the call to find cliques does not take time
# but each time I ask for next item (in the loop below for instance)
# nx compute the next clique so it can take time.
# the number of maximal clique grow exponentiamly with the number of node
# for instance for geneome SYDY001.0321.00001.C001 ther is
# 261 nodes, 22566 edges
# 124 015 680 cliques
cliques = nx.algorithms.clique.find_cliques(G)
max_score = None
for c in cliques:
# it is important to sum the score of clusters
# and creat a solution object only for solution I want to keep
# because there could be lot of cliques
# but only few will be kept (less than 5)
# so the wide majority of these cliques will be thrown
# if I create a Solution object for each clique I spend lot if time and memory to
# instanciate new object to thrown them few line later :-(
current_score = sum([s.score for s in c])
if max_score is None or (current_score > max_score):
max_score = current_score
best_solutions = [Solution(c)] # this solution is better start a new best_solutions
elif current_score == max_score:
best_solutions.append(Solution(c))
# sort the solutions (cliques)
solutions = sorted(best_solutions, reverse=True)
return solutions, max_score
def combine_clusters(clusters: list[Cluster],
true_loners: dict[str: Loner | LonerMultiSystem],
multi_loci: bool = False) -> list[tuple[Cluster]]:
"""
generate the combinations of clusters, with loners and multi systems
:param clusters: the clusters to combines
:param true_loners: the multi-systems hits
:type true_loners: dict the name of the function code by hit gene_ref.alternate_of as key
and 1 :class:`macsypy.cluster.Cluster` with the best a
:class:`macsypy.hit.Loner` or
:class:`macsypy.hit.LonerMultiSystem` hit as value
:param multi_loci: True if the model is multi_loci false otherwise
:return: all available combination of clusters
:rtype: List of combination. a combination is a tuple of :class:`macsypy.cluster.Cluster` objects
"""
if not clusters:
cluster_combinations = []
elif multi_loci:
cluster_combinations = [itertools.combinations(clusters, i) for i in range(1, len(clusters) + 1)]
cluster_combinations = list(itertools.chain(*cluster_combinations))
else:
cluster_combinations = [(clst,) for clst in clusters]
# add loners
loners_combinations = true_loners.items()
loners_combinations = [itertools.combinations(loners_combinations, i) for i in
range(1, len(loners_combinations) + 1)]
loners_combinations = itertools.chain(*loners_combinations)
combination_w_loners = []
for loner_comb in loners_combinations:
loner_functions = [item[0] for item in loner_comb]
loners = [item[1] for item in loner_comb]
if cluster_combinations:
for one_combination in cluster_combinations:
to_add = True
for clstr in one_combination:
if clstr.fulfilled_function(*loner_functions):
to_add = False
break
if to_add:
combination_w_loners.append(tuple(list(one_combination) + loners))
# we always add the loner
# in case definition may contain only loners
# or min_gene_required = 1 with one Loner
combination_w_loners.append(tuple(loners))
cluster_combinations += combination_w_loners
return cluster_combinations
def combine_multisystems(rejected_candidates: list[RejectedCandidate], multi_systems: list[Cluster]):
"""
:param rejected_candidates:
:param multi_systems: sequence of :class:`macsypy.cluster.Cluster`
each cluster must be composed of only one :class:`macsypy.hit.MultiSystem` object
:return: list of cluster combination with the multisystem
:rtype: [(:class:`macsypy.cluster.Cluster` cluster1, cluster2, ...),
(:class:`macsypy.cluster.Cluster` cluster3, cluster4, ...)]
"""
# instead as clusters with loners
# we have not to take in account rejected_cluster in new_comb
# as they already have been challenged and rejected
if isinstance(rejected_candidates, RejectedCandidate):
rejected_candidates = [rejected_candidates]
new_comb = []
ms_combinations = [itertools.combinations(multi_systems, i) for i in range(1, len(multi_systems) + 1)]
ms_combinations = list(itertools.chain(*ms_combinations)) # tuple of MultiSystem
for rej_cand in rejected_candidates:
for one_ms_combination in ms_combinations:
combination_hits = {h for clst in one_ms_combination for h in clst.hits}
functions = [h.gene_ref.alternate_of().name for h in combination_hits]
if not rej_cand.fulfilled_function(*functions):
new_comb.append(tuple(rej_cand.clusters + list(one_ms_combination)))
return new_comb
|