File: tab.pyx

package info (click to toggle)
obitools 3.0.1~b26%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 26,756 kB
  • sloc: ansic: 24,299; python: 657; sh: 27; makefile: 21
file content (91 lines) | stat: -rwxr-xr-x 3,572 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#cython: language_level=3

cimport cython
from obitools3.dms.view.view cimport Line
from obitools3.utils cimport bytes2str_object, str2bytes, tobytes
from obitools3.dms.column.column cimport Column_line, Column_multi_elts
from obitools3.dms.column.typed_column.int cimport Column_int, Column_multi_elts_int

import sys

cdef class TabFormat:
    
    def __init__(self, list tags=[], header=True, bytes NAString=b"NA", bytes sep=b"\t", bint NAIntTo0=True, metabaR=False, ngsfilter=False):
        self.tags = set(tags)
        self.header = header
        self.first_line = True
        self.NAString = NAString
        self.sep = sep
        self.NAIntTo0 = NAIntTo0
        self.metabaR = metabaR
        self.ngsfilter = ngsfilter
        
    @cython.boundscheck(False)    
    def __call__(self, object data):
        
        cdef object ktags
        cdef list tags = [key for key in data]
        
        line = []
        if self.tags != None and self.tags:
            ktags = list(self.tags)
        else:
            ktags = list(set(tags))   
                     
        ktags.sort()
                        
        if self.header and self.first_line:
            for k in ktags:
                if k in tags:
                    if self.metabaR:
                        if k == b'NUC_SEQ':
                            ktoprint = b'sequence'
                        else:
                            ktoprint = k.lower()
                        ktoprint = ktoprint.replace(b'merged_', b'')
                    else:
                        ktoprint = k
                    if isinstance(data.view[k], Column_multi_elts):
                        keys = data.view[k].keys()
                        keys.sort()
                        for k2 in keys:
                            line.append(tobytes(ktoprint)+b':'+tobytes(k2))
                    else:
                        line.append(tobytes(ktoprint))
            r = self.sep.join(value for value in line)
            r += b'\n'
            line = []
                    
        for k in ktags:
            if k in tags:
                value = data[k]
                if isinstance(data.view[k], Column_multi_elts):
                    keys = data.view[k].keys()
                    keys.sort()
                    if value is None:  # all keys at None
                        for k2 in keys: # TODO could be much more efficient
                            line.append(self.NAString)
                    else:
                        for k2 in keys: # TODO could be much more efficient
                            if value[k2] is not None:
                                line.append(str2bytes(str(bytes2str_object(value[k2]))))  # genius programming
                            else:
                                if self.NAIntTo0 and isinstance(data.view[k], Column_multi_elts_int):
                                    line.append(b"0")
                                else:
                                    line.append(self.NAString)
                else:
                    if value is not None or (self.NAIntTo0 and isinstance(data.view[k], Column_int)):
                        line.append(str2bytes(str(bytes2str_object(value))))
                    else:
                        line.append(self.NAString)
                  	      	
        if self.header and self.first_line:
            r += self.sep.join(value for value in line)
        else:
            r = self.sep.join(value for value in line)

        if self.first_line:
            self.first_line = False

        return r