File: test_pbindex.py

package info (click to toggle)
python-pbcore 1.7.1%2Bgit20200430.a127b1e%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 5,404 kB
  • sloc: python: 23,243; xml: 2,504; makefile: 232; sh: 66
file content (65 lines) | stat: -rw-r--r-- 2,617 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import numpy as np

import pbcore.data
from pbcore.io.align import BamReader
from pbcore.io.align.PacBioBamIndex import PacBioBamIndex, StreamingBamIndex


class TestPbIndex:

    BAM_FILE_NAME = pbcore.data.getUnalignedBam()

    @classmethod
    def setup_class(cls):
        cls._bam = BamReader(cls.BAM_FILE_NAME)
        cls._pbi = PacBioBamIndex(cls.BAM_FILE_NAME + ".pbi")

    def test_pbindex_bam_consistency(self):
        assert len(self._pbi) == 117
        for i_rec, rec in enumerate(self._bam):
            assert rec.qId == self._pbi.qId[i_rec]
            assert rec.HoleNumber == self._pbi.holeNumber[i_rec]
            assert rec.qStart == self._pbi.qStart[i_rec]
            assert rec.qEnd == self._pbi.qEnd[i_rec]
        assert i_rec == 116

    def test_pbindex_streaming(self):
        streamed = StreamingBamIndex(self.BAM_FILE_NAME + ".pbi", 20)
        assert streamed.nchunks == 6
        chunks = [chunk for chunk in streamed]
        for attr in ["qId", "holeNumber", "qStart", "qEnd"]:
            combined = np.concatenate([getattr(c, attr) for c in chunks])
            assert len(combined) == len(self._pbi)
            assert all(combined == getattr(self._pbi, attr))
        chunk = streamed.get_chunk(1)
        for attr in ["qId", "holeNumber", "qStart", "qEnd"]:
            assert all(getattr(chunk, attr) == getattr(chunks[1], attr))

    # with the default chunk size there should be just one chunk identical
    # to the whole index
    def test_pbindex_streaming_entire(self):
        streamed = StreamingBamIndex(self.BAM_FILE_NAME + ".pbi")
        assert streamed.nchunks == 1
        chunked = [chunk for chunk in streamed][0]
        assert len(chunked) == len(self._pbi)
        for attr in ["qId", "holeNumber", "qStart", "qEnd"]:
            assert all(getattr(chunked, attr) == getattr(self._pbi, attr))

    def test_pbindex_with_zmw_index(self):
        """
        Test that the built in sub-index of ZMW start positions is correct.
        """
        streamed = StreamingBamIndex(self.BAM_FILE_NAME + ".pbi", 20)
        unique_zmws = set()
        n_indexed_zmws = 0
        for chunk, zmw_idx in streamed.iter_with_zmw_index():
            for k, zmw_start in enumerate(zmw_idx):
                if k < len(zmw_idx) - 1:
                    idx_max = zmw_idx[k+1]
                else:
                    idx_max = len(chunk)
                zmws = chunk.holeNumber[zmw_start:idx_max]
                assert len(set(zmws)) == 1
                unique_zmws.add(zmws[0])
                n_indexed_zmws += 1
        assert len(unique_zmws) == n_indexed_zmws