File: twobit.py

package info (click to toggle)
python-bx 0.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,000 kB
  • sloc: python: 17,136; ansic: 2,326; makefile: 24; sh: 8
file content (141 lines) | stat: -rw-r--r-- 4,314 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Access to files containing sequence data in 'twobit' format.
"""

from collections.abc import Mapping
from struct import (
    calcsize,
    unpack,
)
from typing import (
    BinaryIO,
    Dict,
    List,
    Tuple,
)

from . import _twobit

TWOBIT_MAGIC_NUMBER = 0x1A412743
TWOBIT_MAGIC_NUMBER_SWAP = 0x4327411A
TWOBIT_MAGIC_SIZE = 4

TWOBIT_VERSION = 0


class TwoBitSequence:
    masked_block_sizes: List
    masked_block_starts: List
    n_block_sizes: List
    n_block_starts: List

    def __init__(self, tbf, header_offset=None):
        self.tbf = tbf
        self.header_offset = header_offset
        self.sequence_offset = None
        self.size = None
        self.loaded = False

    def __getitem__(self, slice):
        start, stop, stride = slice.indices(self.size)
        assert stride == 1, "Striding in slices not supported"
        if stop - start < 1:
            return ""
        return _twobit.read(self.tbf.file, self, start, stop, self.tbf.do_mask)

    def __len__(self):
        return self.size

    def get(self, start, end):
        # Trim start / stop
        if start < 0:
            start = 0
        if end > self.size:
            end = self.size
        out_size = end - start
        if out_size < 1:
            raise Exception(f"end before start ({start},{end})")
        # Find position of packed portion
        dna = _twobit.read(self.tbf.file, self, start, end, self.tbf.do_mask)
        # Return
        return dna


class TwoBitFile(Mapping):
    def __init__(self, file: BinaryIO, do_mask: bool = True):
        self.do_mask = do_mask
        # Read magic and determine byte order
        self.byte_order = ">"
        strng = file.read(TWOBIT_MAGIC_SIZE)
        magic = unpack(">L", strng)[0]
        if magic != TWOBIT_MAGIC_NUMBER:
            if magic == TWOBIT_MAGIC_NUMBER_SWAP:
                self.byte_order = "<"
            else:
                raise Exception("Not a NIB file")
        self.magic = magic
        self.file = file
        # Read version
        self.version = self.read("L")
        if self.version != TWOBIT_VERSION:
            raise Exception("File is version '%d' but I only know about '%d'" % (self.version, TWOBIT_VERSION))
        # Number of sequences in file
        self.seq_count = self.read("L")
        # Header contains some reserved space
        self.reserved = self.read("L")
        # Read index of sequence names to offsets
        index: Dict[str, TwoBitSequence] = {}
        for _ in range(self.seq_count):
            name = self.read_p_string()
            offset = self.read("L")
            index[name] = TwoBitSequence(self, offset)
        self.index = index

    def __getitem__(self, name: str) -> TwoBitSequence:
        seq = self.index[name]
        if not seq.loaded:
            self.load_sequence(name)
        return seq

    def __iter__(self):
        return iter(self.index.keys())

    def __len__(self) -> int:
        return len(self.index)

    def load_sequence(self, name: str) -> None:
        seq = self.index[name]
        # Seek to start of sequence block
        self.file.seek(seq.header_offset)
        # Size of sequence
        seq.size = self.read("L")
        # Read N and masked block regions
        seq.n_block_starts, seq.n_block_sizes = self.read_block_coords()
        seq.masked_block_starts, seq.masked_block_sizes = self.read_block_coords()
        # Reserved
        self.read("L")
        # Save start of actualt sequence
        seq.sequence_offset = self.file.tell()
        # Mark as loaded
        seq.loaded = True

    def read_block_coords(self) -> Tuple[list, list]:
        block_count = self.read("L")
        if block_count == 0:
            return [], []
        starts = self.read(str(block_count) + "L", untuple=False)
        sizes = self.read(str(block_count) + "L", untuple=False)
        return list(starts), list(sizes)

    def read(self, pattern: str, untuple: bool = True):
        rval = unpack(self.byte_order + pattern, self.file.read(calcsize(self.byte_order + pattern)))
        if untuple and len(rval) == 1:
            return rval[0]
        return rval

    def read_p_string(self) -> str:
        """
        Read a length-prefixed string
        """
        length = self.read("B")
        return self.file.read(length).decode()