1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
# test CIGAR output format
# mapping class labels
PROGNAM = "../src/smalt"
RCPROGNAM = "./sequenceReverseComplement_test"
REF_FASTA_NAME = "genome_1.fa.gz"
READ_PREFIX = "gen1l100i500e1"
KMER = 11
NSKIP = 5
TMPFIL_PREFIX = "TMP"
OPTION_LABEL_PAIRS_ORIG = (
(['-r', '-1'], ("AA", "BB", "DD", "SR", "BB", "RR", "AA")),
(['-m', '30'], ("AA", "BB", "DD", "??", "BB", "??", "AA")),
(['-l', 'pp', '-r', '-1'], ("CC", "CC", "DD", "BB", "SR", "RR", "CC")),
(['-l', 'mp', '-r', '-1'], ("CC", "CC", "DD", "SR", "SR", "RR", "CC")),
)
OPTION_LABEL_PAIRS_RC2ND = (
(['-r', '-1'], ("CC", "CC", "DD", "BB", "SR", "RR", "CC")),
(['-m', '30'], ("CC", "CC", "DD", "SN", "??", "??", "CC")),
(['-l', 'pp', '-r', '-1'], ("AA", "BB", "DD", "SR", "BB", "RR", "AA")),
(['-l', 'mp', '-r', '-1'], ("CC", "CC", "DD", "SR", "SR", "RR", "CC")),
)
OPTION_LABEL_PAIRS_RC2ND_RC1ST = (
(['-r', '-1'], ("CC", "CC", "DD", "RS", "RS", "RR", "CC")),
(['-m', '30'], ("CC", "CC", "DD", "??", "??", "??", "CC")),
(['-l', 'pp', '-r', '-1'], ("CC", "CC", "DD", "BB", "RS", "RR", "CC")),
(['-l', 'mp', '-r', '-1'], ("AA", "BB", "DD", "RS", "BB", "RR", "AA")),
)
def checkLabels(cigfilnam, label_pairs, mateno_check=True):
from formats import Cigar, openFile, getNextCigarPair
cigA = Cigar()
cigB = Cigar()
infil = openFile(cigfilnam)
for lb in label_pairs:
(isOK, isEOF) = getNextCigarPair(infil, cigA, cigB, mateno_check)
if (not isOK) or isEOF:
exit("missing lines in cigar file %s" % cigfilnam)
if (cigA.mapcls != lb[0] and lb[0] != '?') or \
(cigB.mapcls != lb[1] and lb[1] != '?'):
exit("unexpeced cigar mapping labels %s%s (%s) for read pair %s" % \
(cigA.mapcls, cigB.mapcls, lb, cigA.qnam))
return
def reverseComplement(df, filnam_in, filnam_out):
from sys import exit
from subprocess import call
df.call([RCPROGNAM, filnam_in, filnam_out],
"when reverse complementing reads in file '%s'" % (filnam_in))
def smalt_index(df, index_name, fasta_name, kmer, nskip):
from sys import exit
from subprocess import call
tup = (PROGNAM, 'index',
'-k', '%i' % (int(kmer)),
'-s', '%i' % (int(nskip)),
index_name,
fasta_name)
df.call(tup, "when indexing")
def smalt_map(df,oufilnam, indexnam, readfil, matefil="", option=[]):
from sys import exit
from subprocess import call
tup = [PROGNAM, 'map']
if option:
tup.extend(option)
tup.extend(
['-f', 'cigar',
'-o', oufilnam,
indexnam,
readfil, matefil]
)
df.call(tup, "when mapping")
def process(df, indexnam, oufilnam, readnamA, readnamB, option_label_pairs, mateno_check=True):
for (option, label_pairs) in option_label_pairs:
smalt_map(df,oufilnam, indexnam, readnamA, readnamB, option)
checkLabels(oufilnam, label_pairs, mateno_check)
if __name__ == '__main__':
from testdata import DataFiles
df = DataFiles()
refnam = df.joinData(REF_FASTA_NAME)
readnamA = df.joinData(READ_PREFIX + "_1.fq")
readnamB = df.joinData(READ_PREFIX + "_2.fq")
indexnam = df.addIndex(TMPFIL_PREFIX)
oufilnam = df.addTMP(TMPFIL_PREFIX + ".cig")
readnamRCA = df.addTMP(TMPFIL_PREFIX + "rc_1.fq")
readnamRCB = df.addTMP(TMPFIL_PREFIX + "rc_2.fq")
smalt_index(df,indexnam, refnam, KMER, NSKIP)
process(df,indexnam, oufilnam, readnamA, readnamB, OPTION_LABEL_PAIRS_ORIG)
#print "reverse complement 2nd read ..."
reverseComplement(df,readnamB, readnamRCB)
process(df,indexnam, oufilnam, readnamA, readnamRCB, OPTION_LABEL_PAIRS_RC2ND)
df.addLog(["reverse complement 1st read and swap with 2nd"])
# this equates to changing from -l pe to -l mp
reverseComplement(df,readnamA, readnamRCA)
process(df,indexnam, oufilnam, readnamRCB, readnamRCA,
OPTION_LABEL_PAIRS_RC2ND_RC1ST, False)
df.cleanup()
exit(0)
|