File: multi_fast5.py

package info (click to toggle)
ont-fast5-api 4.1.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 3,548 kB
  • sloc: python: 3,799; makefile: 153; sh: 13
file content (147 lines) | stat: -rw-r--r-- 8,108 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import h5py

from ont_fast5_api import CURRENT_FAST5_VERSION
from ont_fast5_api.fast5_file import Fast5File, Fast5FileTypeError
from ont_fast5_api.fast5_read import AbstractFast5, Fast5Read, copy_attributes
from ont_fast5_api.static_data import HARDLINK_GROUPS, OPTIONAL_READ_GROUPS


class MultiFast5File(AbstractFast5):
    def __init__(self, filename, mode='r', driver=None):
        # See https://docs.h5py.org/en/stable/high/file.html#file-drivers for
        # information on why you might want to use a specific driver.
        self.filename = filename
        self.mode = mode
        self.handle = h5py.File(self.filename, self.mode, driver=driver)
        self._run_id_map = None
        if mode != 'r' and 'file_version' not in self.handle.attrs:
            try:
                self.handle.attrs['file_version'] = str(CURRENT_FAST5_VERSION)
            except IOError as e:
                raise IOError("Could not write 'file_version' in mode '{}' to fast5 file: {}"
                                   "".format(self.filename, self.mode)) from e

    def get_reads(self):
        for group_name in self.handle:
            if group_name.startswith('read_'):
                yield Fast5Read(self, group_name[5:])

    def get_read_ids(self):
        # Return all groups with the 'read_' stripped from the front
        return [group_name[5:] for group_name in self.handle if group_name.startswith('read_')]

    def get_read(self, read_id):
        group_name = "read_" + read_id
        if group_name not in self.handle:
            raise KeyError("Read '{}' not in: {}".format(group_name, self.filename))
        return Fast5Read(self, read_id)

    def create_read(self, read_id, run_id):
        DeprecationWarning("'MultiFast5File.create_read()' will be deprecated. "
                           "Use `MultiFast5File.create_empty_read()` instead")
        return self.create_empty_read(read_id, run_id)

    def create_empty_read(self, read_id, run_id):
        group_name = "read_" + read_id
        if group_name not in self.handle:
            try:
                self.handle.create_group(group_name)
            except ValueError as e:
                raise ValueError("Could not create group '{}' in file: {}".format(group_name, self.filename)) from e
            try:
                for shared_group in HARDLINK_GROUPS:
                    self.handle["{}/{}".format(group_name, shared_group)] = \
                        self.handle["read_{}/{}".format(self.run_id_map[run_id], shared_group)]
            except KeyError:
                # If we can't hardlink to existing groups then continue as normal
                # registering this read as the new source of metadata for this run_id_map
                self.run_id_map[run_id] = read_id
            self.handle[group_name].attrs["run_id"] = run_id
        else:
            raise ValueError("Read '{}' already exists in: {}".format(group_name, self.filename))
        return Fast5Read(self, read_id)


    @property
    def run_id_map(self):
        if self._run_id_map is None:
            self._run_id_map = dict()
            for read in self.get_reads():
                try:
                    self._run_id_map[read.run_id] = read.read_id
                except KeyError:
                    # If we can't find the read.run_id then there is a KeyError
                    # We want to ignore these cases since they can't be linked anyway
                    pass
        return self._run_id_map

    def add_existing_read(self, read_to_add, target_compression=None, sanitize=False):
        if isinstance(read_to_add, Fast5File):
            self._add_read_from_single(read_to_add, target_compression, sanitize=sanitize)
        elif isinstance(read_to_add.parent, MultiFast5File):
            self._add_read_from_multi(read_to_add, target_compression, sanitize=sanitize)
        else:
            raise Fast5FileTypeError("Could not add read to output file from input file type '{}' with path '{}'"
                                     "".format(type(read_to_add.parent), read_to_add.parent.filename))

    def _add_read_from_multi(self, read_to_add, target_compression, sanitize=False):
        read_name = "read_" + read_to_add.read_id
        self.handle.create_group(read_name)
        output_group = self.handle[read_name]
        copy_attributes(read_to_add.handle.attrs, output_group)
        for subgroup in read_to_add.handle:
            if sanitize and subgroup in OPTIONAL_READ_GROUPS:
                # skip optional groups when sanitizing
                continue
            elif subgroup == read_to_add.raw_dataset_group_name \
                    and target_compression is not None \
                    and str(target_compression) not in read_to_add.raw_compression_filters:
                raw_attrs = read_to_add.handle[read_to_add.raw_dataset_group_name].attrs
                raw_data = read_to_add.handle[read_to_add.raw_dataset_name]
                output_read = self.get_read(read_to_add.read_id)
                output_read.add_raw_data(raw_data, raw_attrs, compression=target_compression)
                continue
            elif subgroup in HARDLINK_GROUPS:
                if read_to_add.run_id in self.run_id_map:
                    # There may be a group to link to, but we must check it actually exists!
                    hardlink_source = "read_{}/{}".format(self.run_id_map[read_to_add.run_id], subgroup)
                    if hardlink_source in self.handle:
                        hardlink_dest = "read_{}/{}".format(read_to_add.read_id, subgroup)
                        self.handle[hardlink_dest] = self.handle[hardlink_source]
                        continue
                # If we couldn't hardlink to anything we need to make the group we create available for future reads
                self.run_id_map[read_to_add.run_id] = read_to_add.read_id
            # If we haven't done a special-case copy then we can fall back on the default copy
            output_group.copy(read_to_add.handle[subgroup], subgroup)

    def _add_read_from_single(self, read_to_add, target_compression, sanitize=False):
        read_name = "read_" + read_to_add.read_id
        self.handle.create_group(read_name)
        output_group = self.handle[read_name]
        copy_attributes(read_to_add.handle.attrs, output_group)
        for subgroup in read_to_add.handle:
            if sanitize and subgroup in OPTIONAL_READ_GROUPS:
                # skip optional groups when sanitizing
                continue
            elif subgroup == "UniqueGlobalKey":
                for unique_group in read_to_add.handle["UniqueGlobalKey"]:
                    if unique_group in HARDLINK_GROUPS and read_to_add.run_id in self.run_id_map:
                        hardlink_source = "read_{}/{}".format(self.run_id_map[read_to_add.run_id], unique_group)
                        if hardlink_source in self.handle:
                            hardlink_dest = "read_{}/{}".format(read_to_add.read_id, unique_group)
                            self.handle[hardlink_dest] = self.handle[hardlink_source]
                    else:
                        output_group.copy(read_to_add.handle["UniqueGlobalKey/{}".format(unique_group)],
                                          unique_group)
                self.run_id_map[read_to_add.run_id] = read_to_add.read_id
            elif subgroup == "Raw":
                if target_compression is None or str(target_compression) in read_to_add.raw_compression_filters:
                    output_group.copy(read_to_add.handle[read_to_add.raw_dataset_group_name], "Raw")
                else:
                    raw_attrs = read_to_add.handle[read_to_add.raw_dataset_group_name].attrs
                    raw_data = read_to_add.handle[read_to_add.raw_dataset_name]
                    output_read = self.get_read(read_to_add.read_id)
                    output_read.add_raw_data(raw_data, raw_attrs, compression=target_compression)
            else:
                if not sanitize:
                    output_group.copy(read_to_add.handle[subgroup], subgroup)