File: sample_test.py

package info (click to toggle)
smalt 0.7.6-13
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 72,824 kB
  • sloc: ansic: 26,340; sh: 3,826; python: 1,472; makefile: 323
file content (160 lines) | stat: -rw-r--r-- 4,421 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# test sampling of insert lengths

from operator import ne

PROGNAM = "../src/smalt"
REF_FASTA_NAME = "genome_1.fa.gz"
READ_PREFIX = "gen1l75i300e0"

KMER = 11
NSKIP = 5

TMPFIL_PREFIX = "TMP"
MAXNUM_NONPROPER_PAIRS = 20

def smalt_index(df, index_name, fasta_name, kmer, nskip):
    from sys import exit
    from subprocess import call

    tup = (PROGNAM, 'index',
           '-k', '%i' % (int(kmer)),
           '-s', '%i' % (int(nskip)),
           index_name,
           fasta_name)
    df.call(tup, "when indexing")

def smalt_sample(df,oufilnam, indexnam, readfil, matefil, option=[]):
 
    tup = [PROGNAM, 'sample']
    
    if option:
        tup.extend(option)
    tup.extend(
        ['-o', oufilnam,
         indexnam,
         readfil, matefil]
        )

    df.call(tup, "when sampling")

def smalt_check(df, readfil, matefil=""):
    tup = [PROGNAM, 'check', readfil]
    
    if len(matefil) > 0:
        tup.append(matefil)
    df.call(tup, "when checking")

def smalt_map(df, oufilnam, indexnam, samplenam, readfil, matefil, option=[]):
    from sys import exit
    from subprocess import call
 
    tup = [PROGNAM, 'map']
    
    if option:
        tup.extend(option)
    tup.extend(
        ['-g', samplenam,
        '-f', 'cigar',
        '-o', oufilnam,
         indexnam,
         readfil, matefil]
        )
    df.call(tup, "when mapping")

def assess_mapping(oufilnam):
    from formats import Cigar, getNextCigarPair, openFile

    infil = openFile(oufilnam, 'r')
    
    cigA = Cigar()
    cigB = Cigar()

    pair_ctr = 0
    nonproper_ctr = 0
    (isOK, isEOF) = getNextCigarPair(infil, cigA, cigB)
    while isOK:
        pair_ctr = pair_ctr + 1
        if cigA.mapcls != 'A':
            nonproper_ctr = nonproper_ctr + 1
        if cigB.mapcls != 'A':
            nonproper_ctr = nonproper_ctr + 1
        (isOK, isEOF) = getNextCigarPair(infil, cigA, cigB)
    infil.close()
    
    if pair_ctr != 10000:
        exit("Found %i pairs, but expected 10,000." % pair_ctr)
    if nonproper_ctr > MAXNUM_NONPROPER_PAIRS:
        exit("Found %i non-proper pairs. Expected max. %i" %
             (nonproper_ctr, MAXNUM_NONPROPER_PAIRS))

def compare_mapping(oufilnam1, oufilnam2):
    from formats import Cigar, openFile

    infil1 = openFile(oufilnam1, 'r')
    infil2 = openFile(oufilnam2, 'r')
    
    cig1 = Cigar()
    cig2 = Cigar()

    ctr1 = 0
    ctr2 = 0
    while 1:
        if cig1.next(infil1):
            break
        ctr1 = ctr1 + 1
        
        if cig2.next(infil2):
            break
        ctr2 = ctr2 + 1

        if ne(cig1.qnam, cig2.qnam):
            exit("readnames don't match: '%s' vs '%s'" % \
                 (cig1.qnam, cig2.qnam))
        if ne(cig1,cig2) and cig1.mapq > 5 and cig2.mapq > 5:
            exit("mappings don't match for read '{}'; cig1: {}, cig2: {}, sig1.mapq: {}, sig2.mapq: {}".format(
                 cig1.qnam, cig1, cig2, cig1.mapq, cig2.mapq))
    infil2.close()
    infil1.close()

    if ctr1 != ctr1:
        exit("Expected the same number of mates, got %i (A) and %i (B)" % \
             ctr1, ctr2)
    if ctr1 != 20000:
        exit("Expected 20,000 reads, got %i." % ctr1)
    
if __name__ == '__main__':
    from testdata import DataFiles
    
    df = DataFiles()
    
    refnam = df.joinData(REF_FASTA_NAME)
    readnamA = df.joinData(READ_PREFIX + "_1.fq.gz")
    readnamB = df.joinData(READ_PREFIX + "_2.fq.gz")
    indexnam = df.addIndex(TMPFIL_PREFIX)

    samplenam1 = df.addTMP(TMPFIL_PREFIX + ".1.txt")
    samplenam2 = df.addTMP(TMPFIL_PREFIX + ".2.txt")
    
    oufilnam1 = df.addTMP(TMPFIL_PREFIX + ".1.cig")
    oufilnam2 = df.addTMP(TMPFIL_PREFIX + ".2.cig")
    oufilnam3 = df.addTMP(TMPFIL_PREFIX + ".3.cig")

    smalt_check(df,readnamA, readnamB)
    smalt_index(df,indexnam, refnam, KMER, NSKIP)
    smalt_sample(df,samplenam1, indexnam, readnamA, readnamB)
    smalt_map(df,oufilnam1, indexnam, samplenam1, readnamA, readnamB)
    assess_mapping(oufilnam1)

    nthread_tup = ['-n', '4']
    
    smalt_sample(df,samplenam2, indexnam, readnamA, readnamB, nthread_tup)
    smalt_map(df,oufilnam2, indexnam, samplenam2, readnamA, readnamB, nthread_tup)
    assess_mapping(oufilnam2)

    nthread_tup = ['-n', '4', '-O']
    smalt_map(df,oufilnam3, indexnam, samplenam2, readnamA, readnamB, nthread_tup)
    compare_mapping(oufilnam1, oufilnam3)
    
    df.cleanup()
    exit(0)