File: _consensus_assignment.py

package info (click to toggle)
q2-feature-classifier 2024.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,504 kB
  • sloc: python: 3,730; makefile: 38; sh: 16
file content (228 lines) | stat: -rw-r--r-- 9,819 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2023, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------

from collections import Counter
from math import ceil

import pandas as pd

from qiime2.plugin import Str, Float, Range
from .plugin_setup import plugin
from q2_types.feature_data import FeatureData, Taxonomy, BLAST6


min_consensus_param = {'min_consensus': Float % Range(
    0.5, 1.0, inclusive_end=True, inclusive_start=False)}

min_consensus_param_description = {
    'min_consensus': 'Minimum fraction of assignments must match top '
                     'hit to be accepted as consensus assignment.'}

DEFAULTUNASSIGNABLELABEL = "Unassigned"


def find_consensus_annotation(search_results: pd.DataFrame,
                              reference_taxonomy: pd.Series,
                              min_consensus: int = 0.51,
                              unassignable_label: str =
                              DEFAULTUNASSIGNABLELABEL
                              ) -> pd.DataFrame:
    """Find consensus taxonomy from BLAST6Format alignment summary.

    search_results: pd.dataframe
        BLAST6Format search results with canonical headers attached.
    reference_taxonomy: pd.Series
        Annotations of reference database used for original search.
    min_consensus : float
        The minimum fraction of the annotations that a specific annotation
        must be present in for that annotation to be accepted. Current
        lower boundary is 0.51.
    unassignable_label : str
        The label to apply if no acceptable annotations are identified.
    """
    # load and convert blast6format results to dict of taxa hits
    obs_taxa = _blast6format_df_to_series_of_lists(
        search_results, reference_taxonomy,
        unassignable_label=unassignable_label)
    # TODO: is it worth allowing early stopping if maxaccepts==1?
    # compute consensus annotations
    result = _compute_consensus_annotations(
        obs_taxa, min_consensus=min_consensus,
        unassignable_label=unassignable_label)
    result.index.name = 'Feature ID'
    return result


plugin.methods.register_function(
    function=find_consensus_annotation,
    inputs={'search_results': FeatureData[BLAST6],
            'reference_taxonomy': FeatureData[Taxonomy]},
    parameters={
        **min_consensus_param,
        'unassignable_label': Str},
    outputs=[('consensus_taxonomy', FeatureData[Taxonomy])],
    input_descriptions={
        'search_results': 'Search results in BLAST6 output format',
        'reference_taxonomy': 'reference taxonomy labels.'},
    parameter_descriptions={
        **min_consensus_param_description,
        'unassignable_label': 'Annotation given when no consensus is found.'
    },
    output_descriptions={
        'consensus_taxonomy': 'Consensus taxonomy and scores.'},
    name='Find consensus among multiple annotations.',
    description=('Find consensus annotation for each query searched against '
                 'a reference database, by finding the least common ancestor '
                 'among one or more semicolon-delimited hierarchical '
                 'annotations. Note that the annotation hierarchy is assumed '
                 'to have an even number of ranks.'),
)


def _blast6format_df_to_series_of_lists(
        assignments: pd.DataFrame,
        ref_taxa: pd.Series,
        unassignable_label: str = DEFAULTUNASSIGNABLELABEL
) -> pd.Series:
    """import observed assignments in blast6 format to series of lists.

    assignments: pd.DataFrame
        Taxonomy observation map in blast format 6. Each line consists of
        taxonomy assignments of a query sequence in tab-delimited format:
            <query_id>    <subject-seq-id>   <...other columns are ignored>

    ref_taxa: pd.Series
        Reference taxonomies in tab-delimited format:
            <accession ID>  Annotation
        The accession IDs in this taxonomy should match the subject-seq-ids in
        the "assignment" input.
    """
    # validate that assignments are present in reference taxonomy
    # (i.e., that the correct reference taxonomy was used).
    # Note that we drop unassigned labels from this set.
    missing_ids = \
        set(assignments['sseqid'].values) - set(ref_taxa.index) - {'*', ''}
    if len(missing_ids) > 0:
        raise KeyError('Reference taxonomy and search results do not match. '
                       'The following identifiers were reported in the search '
                       'results but are not present in the reference taxonomy:'
                       ' {0}'.format(', '.join(str(i) for i in missing_ids)))

    # if vsearch fails to find assignment, it reports '*' as the
    # accession ID, so we will add this mapping to the reference taxonomy.
    ref_taxa['*'] = unassignable_label
    assignments_copy = assignments.copy(deep=True)
    for index, value in assignments_copy.iterrows():
        sseqid = assignments_copy.iloc[index]['sseqid']
        assignments_copy.at[index, 'sseqid'] = ref_taxa.at[sseqid]
    # convert to dict of {accession_id: [annotations]}
    taxa_hits: pd.Series = assignments_copy.set_index('qseqid')['sseqid']
    taxa_hits = taxa_hits.groupby(taxa_hits.index).apply(list)

    return taxa_hits


def _compute_consensus_annotations(
        query_annotations, min_consensus,
        unassignable_label=DEFAULTUNASSIGNABLELABEL):
    """
        Parameters
        ----------
        query_annotations : pd.Series of lists
            Indices are query identifiers, and values are lists of all
            taxonomic annotations associated with that identifier.
        Returns
        -------
        pd.DataFrame
            Indices are query identifiers, and values are the consensus of the
            input taxonomic annotations, and the consensus score.
    """
    # define function to apply to each list of taxa hits
    # Note: I am setting this up to open the possibility to define other
    # functions later (e.g., not simple threshold consensus)
    def _apply_consensus_function(taxa, min_consensus=min_consensus,
                                  unassignable_label=unassignable_label,
                                  _consensus_function=_lca_consensus):
        # if there is no consensus, skip consensus calculation
        if len(taxa) == 1:
            taxa, score = taxa.pop(), 1.
        else:
            taxa = _taxa_to_cumulative_ranks(taxa)
            # apply and score consensus
            taxa, score = _consensus_function(
                taxa, min_consensus, unassignable_label)
        # return as a series so that the outer apply returns a dataframe
        # (i.e., consensus scores get inserted as an additional column)
        return pd.Series([taxa, score], index=['Taxon', 'Consensus'])

    # If func returns a Series object the result will be a DataFrame.
    return query_annotations.apply(_apply_consensus_function)


# first split semicolon-delimited taxonomies by rank
# and iteratively join ranks, so that: ['a;b;c', 'a;b;d', 'a;g;g'] -->
# [['a', 'a;b', 'a;b;c'], ['a', 'a;b', 'a;b;d'], ['a', 'a;g', 'a;g;g']]
# this is to avoid issues where e.g., the same species name may occur
# in different taxonomic lineages.
def _taxa_to_cumulative_ranks(taxa):
    """
        Parameters
        ----------
        taxa : list or str
            List of semicolon-delimited taxonomic labels.
            e.g., ['a;b;c', 'a;b;d']
        Returns
        -------
        list of lists of str
            Lists of cumulative taxonomic ranks for each input str
            e.g., [['a', 'a;b', 'a;b;c'], ['a', 'a;b', 'a;b;d']]
    """
    return [[';'.join(t.split(';')[:n + 1])
             for n in range(t.count(';') + 1)]
            for t in taxa]


# Find the LCA by consensus threshold. Return label and the consensus score.
def _lca_consensus(annotations, min_consensus, unassignable_label):
    """ Compute the consensus of a collection of annotations
        Parameters
        ----------
        annotations : list of lists
            Taxonomic annotations to form consensus.
        min_consensus : float
            The minimum fraction of the annotations that a specific annotation
            must be present in for that annotation to be accepted. Current
            lower boundary is 0.51.
        unassignable_label : str
            The label to apply if no acceptable annotations are identified.
        Result
        ------
        consensus_annotation: str
            The consensus assignment
        consensus_fraction: float
            Fraction of input annotations that agreed at the deepest
            level of assignment
    """
    # count total number of labels to get consensus threshold
    n_annotations = len(annotations)
    threshold = ceil(n_annotations * min_consensus)
    # zip together ranks and count frequency of each unique label.
    # This assumes that a hierarchical taxonomy with even numbers of
    # ranks was used.
    taxa_comparison = [Counter(rank) for rank in zip(*annotations)]
    # iterate rank comparisons in reverse
    # to find rank with consensus count > threshold
    for rank in taxa_comparison[::-1]:
        # grab most common label and its count
        label, count = rank.most_common(1)[0]
        # TODO: this assumes that min_consensus >= 0.51 (current lower bound)
        # but could fail to find ties if we allow lower min_consensus scores
        if count >= threshold:
            return label, round(count / n_annotations, 3)
    # if we reach this point, no consensus was ever found at any rank
    return unassignable_label, 0.0