File: alignment.py

package info (click to toggle)
ont-fast5-api 4.1.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 3,548 kB
  • sloc: python: 3,799; makefile: 153; sh: 13
file content (207 lines) | stat: -rw-r--r-- 9,678 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
""" Helper class for working with alignment type analyses.
"""
import numpy as np

from ont_fast5_api.analysis_tools.base_tool import BaseTool
from ont_fast5_api.fast5_file import Fast5File
from ont_fast5_api.analysis_tools.segmentation import SegmentationTools
from ont_fast5_api.fast5_read import Fast5Read


class AlignmentTools(BaseTool):
    """ Provides helper methods specific to alignment analyses.
    """
    
    def __init__(self, source, mode='r', group_name=None, meta=None, config=None):
        """ Create a new alignment tools object.
        
        :param source: Either an open Fast5File object, or a filename
            of a fast5 file.
        :param mode: The open mode (r or r+). Only if a filename is used
            for the source argument.
        :param group_name: The specific alignment analysis instance
            you are interested in.
        :param meta: Metadata for a new alignment analysis.
        :param config: Configuration data for a new alignment analysis.
        
        To create a new alignment analysis, provide a group name that
        does not already exist, and an optional dictionary with the metadata.
        The following fields are recommended, as a minimum:
            
            * name - The name of the basecall software used.
            * time_stamp - The time at which the analysis was performed.
        
        If the group name already exists, the "meta" parameter is ignored. If
        the specified group has a "component" attribute, and its value is not
        "alignment", an exception will be thrown.
        """
        if isinstance(source, Fast5Read):
            self.handle = source
            self.close_handle_when_done = False
        elif isinstance(source, str):
            self.handle = Fast5File(source, mode)
            self.close_handle_when_done = True
        else:
            raise Exception('Unrecognized type for argument "source".')
        if group_name is None:
            group_name = self.handle.get_latest_analysis('Alignment')
            if group_name is None:
                raise Exception('No Alignment analysis group found in file.')
        self.group_name = group_name
        attrs = self.handle.get_analysis_attributes(group_name)
        if attrs is None:
            if meta is None:
                meta = {}
            self.handle.add_analysis('alignment', group_name, meta, config)
            attrs = self.handle.get_analysis_attributes(group_name)
        if ('component' in attrs
                and attrs['component'] not in ['alignment',
                                               'calibration_strand']):
            self.close()
            raise Exception('Analysis does not appear to be an alignment component.')

    def get_results(self):
        """ Get details about the alignments that have been performed.

        :return: A dict of dicts.

        The keys of the top level are 'template', 'complement' and '2d'.
        Each of these dicts contains the following fields:

            * status: Can be 'no data', 'no match found', or 'match found'.
            * direction: Can be 'forward', 'reverse'.
            * ref_name: Name of reference.
            * ref_span: Section of reference aligned to, as a tuple (start, end).
            * seq_span: Section of the called sequence that aligned, as a tuple (start, end).
            * seq_len: Total length of the called sequence.
            * num_aligned: Number of bases that aligned to bases in the reference.
            * num_correct: Number of aligned bases that match the reference.
            * num_deletions: Number of bases in the aligned section of the
                reference that are not aligned to bases in the called sequence.
            * num_insertions: Number of bases in the aligned section of the called
                sequence that are not aligned to bases in the reference.
            * identity: The fraction of aligned bases that are correct (num_correct /
                num_aligned).
            * accuracy: The overall basecall accuracy, according to the alignment.
                (num_correct / (num_aligned + num_deletions + num_insertions)).
        
        Note that if the status field is not 'match found', then all the other
        fields will be absent.
        """
        summary = self.handle.get_summary_data(self.group_name)
        results = {'template': {'status': 'no data'},
                   'complement': {'status': 'no data'},
                   '2d': {'status': 'no data'}}
        if 'genome_mapping_template' in summary:
            results['template'] = self._get_results(summary['genome_mapping_template'])
        if 'genome_mapping_complement' in summary:
            results['complement'] = self._get_results(summary['genome_mapping_complement'])
        if 'genome_mapping_2d' in summary:
            results['2d'] = self._get_results(summary['genome_mapping_2d'])
        return results

    def get_alignment_data(self, section):
        """ Get the alignment SAM and Fasta, if present.
        
        :param section: Can be 'template', 'complement', or '2d'.
        :return: A tuple containing the SAM and the section of the reference
            aligned to (both as strings). Returns None if no alignment is
            present for that section.
        """
        subgroup = '{}/Aligned_{}'.format(self.group_name, section)
        sam = self.handle.get_analysis_dataset(subgroup, 'SAM')
        fasta = self.handle.get_analysis_dataset(subgroup, 'Fasta')
        if sam is None or fasta is None:
            return None
        sequence = fasta.split('\n')[1]
        return sam, sequence

    def add_alignment_data(self, section, sam, sequence):
        """ Add the SAM and Fasta alignment data for a section.
        
        :param section: Can be 'template', 'complement', or '2d'.
        :param sam: A string containing the SAM contents.
        :param sequence: A string containing the section of the
            reference the basecall aligned to.
        """
        subgroup = 'Aligned_{}'.format(section)
        if not subgroup in self.handle.handle['Analyses/{}'.format(self.group_name)]:
            self.handle.add_analysis_subgroup(self.group_name, subgroup)
        sam_arr = np.array(sam, dtype=str)
        self.handle.add_analysis_dataset('{}/{}'.format(self.group_name, subgroup), 'SAM', sam_arr)
        fasta_arr = np.array('>{}\n{}\n'.format(section, sequence), dtype=str)
        self.handle.add_analysis_dataset('{}/{}'.format(self.group_name, subgroup), 'Fasta', fasta_arr)

    def calculate_speed(self, section, alignment_results=None):
        """ Calculate speed using alignment information.

        :param section: The section (template or complement) we're calculating
            speed for.
        :param alignment_results: Optional dictionary of the alignment summary,
            so that speed can be calculated without having to write the summary
            out to the fast5 file first.
        :return: Speed in bases per second or zero if the speed could not be
            calculated.

        The only reliable way we have of finding out how many bases have gone through the pore is by
        looking at how much of the reference the sequence aligned to. This takes that information and
        uses it to calculate speed in reference-bases-per-second.
        """
        speed = 0.0
        if alignment_results:
            results = self._get_results(alignment_results)
        else:
            results = self.get_results()[section]
        if results['status'] != 'match found':
            return 0.0
        ref_span = results['ref_span']
        ref_len = ref_span[1] - ref_span[0]
        seq_span = results['seq_span']
        seq_len = seq_span[1] - seq_span[0]
        total_len = results['seq_len']

        sample_rate = self.handle.get_channel_info()['sampling_rate']

        # We need the duration from the segmentation results
        chain = self.handle.get_chain(self.group_name)
        if chain is not None:
            segmentation_group = dict(chain).get('segmentation')
        else:
            segmentation_group = None
        duration = 0
        if segmentation_group is not None:
            with SegmentationTools(self.handle, group_name=segmentation_group) as seg:
                summary = seg.get_results()
                if summary is not None:
                    duration = summary['duration_{}'.format(section)]
        if duration == 0:
            return 0.0

        normalized_duration = duration * seq_len / float(total_len)
        speed = sample_rate * ref_len / normalized_duration
        return speed

    ##########################
    #
    #  Private methods below
    #
    ##########################

    def _get_results(self, summary):
        results = {'status': 'no data'}
        ref_name = summary['genome']
        if ref_name == 'no_match':
            results['status'] = 'no match found'
            return results
        results['status'] = 'match found'
        results['direction'] = 'forward'
        if ref_name.endswith('_rc'):
            ref_name = ref_name[:-3]
            results['direction'] = 'reverse'
        results['ref_name'] = ref_name
        results['ref_span'] = (summary['genome_start'], summary['genome_end'])
        results['seq_span'] = (summary['strand_start'], summary['strand_end'])
        results['seq_len'] = summary['num_events']
        results.update({key: summary[key] for key in ['num_aligned', 'num_correct', 'num_insertions',
                                                      'num_deletions', 'identity', 'accuracy']})
        return results