1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
# Author: David Alexander
import numpy as np
import re
class UnavailableFeature(Exception):
pass
class Unimplemented(NotImplementedError):
pass
class ReferenceMismatch(Exception):
pass
class IncompatibleFile(Exception):
pass
BASE_FEATURE_TAGS = {"InsertionQV": ("iq", "qv", np.uint8),
"DeletionQV": ("dq", "qv", np.uint8),
"DeletionTag": ("dt", "base", np.int8),
"SubstitutionQV": ("sq", "qv", np.uint8),
"MergeQV": ("mq", "qv", np.uint8),
"Ipd:Frames": ("ip", "frames", np.uint16),
"Ipd:CodecV1": ("ip", "codecV1", np.uint8),
"PulseWidth:Frames": ("pw", "frames", np.uint16),
"PulseWidth:CodecV1": ("pw", "codecV1", np.uint8)}
PULSE_FEATURE_TAGS = {"PulseCall": ("pc", "pulse", np.uint8),
"StartFrame": ("sf", "frames32", np.uint32),
"PkMid": ("pm", "photons", np.uint16),
"PkMean": ("pa", "photons", np.uint16)}
ASCII_COMPLEMENT_MAP = {ord("A"): ord("T"),
ord("T"): ord("A"),
ord("C"): ord("G"),
ord("G"): ord("C"),
ord("N"): ord("N"),
ord("-"): ord("-")}
complementAscii = np.vectorize(ASCII_COMPLEMENT_MAP.get, otypes=[np.int8])
def reverseComplementAscii(a):
return complementAscii(a)[::-1]
BAM_CMATCH = 0
BAM_CINS = 1
BAM_CDEL = 2
BAM_CREF_SKIP = 3
BAM_CSOFT_CLIP = 4
BAM_CHARD_CLIP = 5
BAM_CPAD = 6
BAM_CEQUAL = 7
BAM_CDIFF = 8
#
# qId calculation from RG ID string
#
def rgAsInt(rgIdString):
return np.int32(int(re.sub("-", "", rgIdString.split("/")[0]), 16)
% (np.iinfo(np.int32).max+1))
#
# Kinetics: decode the scheme we are using to encode approximate frame
# counts in 8-bits.
#
def _makeFramepoints():
B = 2
t = 6
T = 2**t
framepoints = []
next = 0
for i in range(256//T):
grain = B**i
nextOnes = next + grain * np.arange(0, T)
next = nextOnes[-1] + grain
framepoints = framepoints + list(nextOnes)
return np.array(framepoints, dtype=np.uint16)
def _makeLookup(framepoints):
# (frame -> code) involves some kind of rounding
# basic round-to-nearest
frameToCode = np.empty(shape=max(framepoints)+1, dtype=int)
for i, (fl, fu) in enumerate(zip(framepoints, framepoints[1:])):
if (fu > fl + 1):
m = (fl + fu)//2
for f in range(fl, m):
frameToCode[f] = i
for f in range(m, fu):
frameToCode[f] = i + 1
else:
frameToCode[fl] = i
# Extra entry for last:
frameToCode[fu] = i + 1
return frameToCode, fu
_framepoints = _makeFramepoints()
_frameToCode, _maxFramepoint = _makeLookup(_framepoints)
def framesToCode(nframes):
nframes = np.minimum(_maxFramepoint, nframes)
return _frameToCode[nframes]
def codeToFrames(code):
return _framepoints[code]
def downsampleFrames(nframes):
return codeToFrames(framesToCode(nframes))
|