File: test_fast5_subset.py

package info (click to toggle)
ont-fast5-api 4.1.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 3,548 kB
  • sloc: python: 3,799; makefile: 153; sh: 13
file content (131 lines) | stat: -rw-r--r-- 6,560 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os
import numpy
from unittest.mock import patch
from pathlib import Path

from ont_fast5_api.compression_settings import VBZ_V0
from ont_fast5_api.conversion_tools.fast5_subset import Fast5Filter
from ont_fast5_api.conversion_tools.conversion_utils import Fast5FilterWorker, extract_selected_reads, read_generator
from ont_fast5_api.multi_fast5 import MultiFast5File
from ont_fast5_api.fast5_file import Fast5File
from test.helpers import TestFast5ApiHelper, test_data


class TestFast5Subset(TestFast5ApiHelper):
    input_multif5_path = Path(test_data) / "multi_read" / "batch_0.fast5"
    read_set = {"fe85b517-62ee-4a33-8767-41cab5d5ab39", "fe9374ee-b86a-4ca4-81dc-ac06e3297728"}

    def test_read_generator(self):
        count = 0
        for read_id, read in read_generator(input_file=self.input_multif5_path, read_set=self.read_set):
            self.assertIn(read_id, self.read_set)
            count += 1

        self.assertEqual(len(self.read_set), count)

    def _create_read_list_file(self, read_ids):
        output_path = os.path.join(self.save_path, 'read_list.txt')
        with open(output_path, 'w') as fh:
            for read_id in read_ids:
                fh.write(read_id + "\n")
        return output_path

    @patch('ont_fast5_api.conversion_tools.fast5_subset.logging')
    @patch('ont_fast5_api.conversion_tools.fast5_subset.get_progress_bar')
    def test_subset_from_single(self, mock_log, mock_pbar):
        input_path = os.path.join(test_data, "single_reads")
        read_list = self._create_read_list_file(self.read_set)
        f5_filter = Fast5Filter(input_folder=input_path,
                                output_folder=self.save_path,
                                read_list_file=read_list)
        f5_filter.run_batch()

        count = 0
        with MultiFast5File(os.path.join(self.save_path, 'batch0.fast5'), 'r') as output_f5:
            for input_file in os.listdir(input_path):
                with Fast5File(os.path.join(input_path, input_file), 'r') as input_f5:
                    read_id = input_f5.get_read_id()
                    if read_id in self.read_set:
                        read_in = input_f5.get_read(read_id)
                        read_out = output_f5.get_read(read_id)
                        self.assertTrue(numpy.array_equal(read_in.get_raw_data(), read_out.get_raw_data()))
                        count += 1
        self.assertEqual(len(self.read_set), count)

    @patch('ont_fast5_api.conversion_tools.fast5_subset.logging')
    @patch('ont_fast5_api.conversion_tools.fast5_subset.get_progress_bar')
    def test_subset_from_multi(self, mock_log, mock_pbar):
        read_list = self._create_read_list_file(self.read_set)
        f5_filter = Fast5Filter(input_folder=os.path.dirname(self.input_multif5_path),
                                output_folder=self.save_path,
                                read_list_file=read_list)
        f5_filter.run_batch()
        with MultiFast5File(self.input_multif5_path, 'r') as input_f5, \
                MultiFast5File(os.path.join(self.save_path, 'batch0.fast5'), 'r') as output_f5:
            self.assertEqual(len(self.read_set), len(output_f5.get_read_ids()))
            for read_id in self.read_set:
                read_in = input_f5.get_read(read_id)
                read_out = output_f5.get_read(read_id)
                self.assertTrue(numpy.array_equal(read_in.get_raw_data(), read_out.get_raw_data()))

    def test_extract_selected_reads(self):
        # three test for count below, equaling and above number of read in input file
        for count in (1, 2, 3):
            temp_file_name = self.generate_temp_filename()
            found_reads, output_file, input_file = extract_selected_reads(input_file=self.input_multif5_path,
                                                                          output_file=temp_file_name,
                                                                          count=count, read_set=self.read_set)
            if count < len(self.read_set):
                self.assertTrue(found_reads.issubset(self.read_set))
                self.assertEqual(input_file, self.input_multif5_path)
            elif count == len(self.read_set):
                self.assertEqual(found_reads, self.read_set)
                self.assertEqual(input_file, self.input_multif5_path)
            elif count >= len(self.read_set):
                self.assertEqual(found_reads, self.read_set)
                self.assertIsNone(input_file)

            self.assertEqual(output_file, temp_file_name)
            # verify that resulting output file is a legal MultiFast5 with desired reads in it
            with MultiFast5File(output_file) as multi_file:
                readlist = multi_file.get_read_ids()
                self.assertTrue(set(readlist).issubset(self.read_set))

    @patch('ont_fast5_api.conversion_tools.conversion_utils.ProgressBar')
    @patch('ont_fast5_api.conversion_tools.fast5_subset.logging')
    def test_selector_args_generator(self, mock_pbar, mock_logger):
        single_reads = os.path.join(test_data, "single_reads")
        self.assertTrue(os.path.isdir(single_reads), msg=single_reads)

        input_f5s = list(Path(single_reads).glob('*.fast5'))
        batch_size = 1

        f = Fast5FilterWorker(
            input_file_list=input_f5s,
            output_dir=Path(self.save_path),
            read_set=self.read_set,
            batch_size=batch_size,
            filename_base="batch",
            target_compression=VBZ_V0,
            progressbar=mock_pbar,
            logger=mock_logger
        )

        args_combos = list(f._args_generator())
        # there should be two tuples of arguments
        self.assertEqual(len(args_combos), len(self.read_set) / batch_size)

        num_files_queued = len(f.input_f5s) # should be 0
        self.assertEqual(num_files_queued, (len(input_f5s) - len(args_combos)), msg=f.input_f5s)
        self.assertEqual(len(f.available_out_files), 0)

        # "exhaust" an input file and put output file back on queue
        input_file, output_file, reads, count, compression = args_combos[0]
        f._update_file_lists(reads={}, in_file=None, out_file=output_file)
        self.assertEqual(len(f.input_f5s), num_files_queued)
        self.assertEqual(len(f.available_out_files), 1)
        self.assertEqual(compression, VBZ_V0)

        # this results in another args tuple generated
        new_args_combos = list(f._args_generator())
        self.assertEqual(len(new_args_combos), 1, msg=len(new_args_combos))