1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
#!/usr/bin/python3
import os, shutil, sys
from io import StringIO
# FASTA generation
fastaSeq_1 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAAC
AACGCAGCTCCGCCCTCGCGGTGCTCTCCGGGTCTGTGCTGAGGAGAACGCAACTCCGCCGGCGCAGGCG"""
fastaSeq_2 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAAC
AACGCAGCTCCGCCCTCGCGGTGCTCTCCGGGTCTGTGCTGAGGAGAACGCAAC"""
fastaSeq_3 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAAC
ACCCTAACCCCAACCCCAACCCCAACCCCAACCCCAACCCCAACCCTAACCCCTAACCCTAACCCT"""
# FASTQ generation
fastqSeq_1 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACAACGCAGCTCCGCCCTCGCGGTGCTCTCCGGGTCTGTGCTGAGGAGAACGCAACTCCGCCGGCGCAGGCG"""
fastqQuals_1 = """[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[["""
fastqSeq_2 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACAACGCAGCTCCGCCCTCGCGGTGCTCTCCGGGTCTGTGCTGAGGAGAACGCAAC"""
fastqQuals_2 = """[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[["""
fastqSeq_3 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACACCCTAACCCCAACCCCAACCCCAACCCCAACCCCAACCCCAACCCTAACCCCTAACCCTAACCCT"""
fastqQuals_3 = """]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]"""
# file creation decorator
def fileMaker(func):
def inner(*args, **kwargs):
print(" - Creating file: %s..." % args[1], end='')
sys.stdout.flush()
retval = func(*args)
print("done.")
sys.stdout.flush()
return retval
return inner
# symlink creation decorator
def fileLinker(func):
def inner(*args, **kwargs):
print(" - Creating symlink: %s..." % args[1], end='')
sys.stdout.flush()
retval = func(*args)
print("done.")
sys.stdout.flush()
return retval
return inner
# return a copy of original, minues any lines that contain an entry in blacklist
def trimXmlElements(original, blacklist):
out = StringIO()
for line in original.splitlines():
if all(x not in line for x in blacklist):
out.write(line + '\n')
result = out.getvalue()
out.close()
return result
class TestDataGenerator:
def __init__(self, source, dest):
# source/destination directories
self.testDataDir = source
self.generatedDataDir = dest
# generated output files/symlinks & 'maker' functions
self.outputFiles = {
'truncated.bam' : self.makeTruncatedBam,
'chunking_emptyfilters.subreadset.xml' : self.makeChunkingXml,
'chunking_missingfilters.subreadset.xml' : self.makeChunkingXml,
'normal.fa' : self.makeNormalFasta,
'normal.fq' : self.makeNormalFastq
}
self.outputSymlinks = {
'aligned.bam' : self.makeAlignedBamCopy,
'aligned.bam.bai' : self.makeAlignedBamCopy,
'aligned.bam.pbi' : self.makeAlignedBamCopy,
'aligned2.bam' : self.makeAlignedBamCopy,
'aligned2.bam.bai' : self.makeAlignedBamCopy,
'aligned2.bam.pbi' : self.makeAlignedBamCopy,
'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.1.subreads.bam' : self.makeChunkingSymlink,
'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.1.subreads.bam.pbi' : self.makeChunkingSymlink,
'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.2.subreads.bam' : self.makeChunkingSymlink,
'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.2.subreads.bam.pbi' : self.makeChunkingSymlink,
'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.3.subreads.bam' : self.makeChunkingSymlink,
'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.3.subreads.bam.pbi' : self.makeChunkingSymlink,
'missing_pbi.bam' : self.makeMissingPbiBam,
}
def editChunkingXml(self, outputFn, removeFiltersNode):
inputXmlFn = os.path.join(self.testDataDir,'chunking','chunking.subreadset.xml')
outputXmlFn = os.path.join(self.generatedDataDir,outputFn)
blacklist = ['pbds:Filter>', 'pbbase:Properties>', '<pbbase:Property']
if removeFiltersNode:
blacklist.append('pbds:Filters>')
inputXml = ''
with open(inputXmlFn, 'r') as xml_infile:
inputXml = xml_infile.read()
outputXml = trimXmlElements(inputXml, blacklist)
with open(outputXmlFn, 'w') as xml_outfile:
xml_outfile.write(outputXml)
@fileLinker
def makeAlignedBamCopy(self, outputFn):
source = os.path.join(self.testDataDir,outputFn)
dest = os.path.join(self.generatedDataDir, outputFn)
os.symlink(source, dest)
@fileLinker
def makeChunkingSymlink(self, outputFn):
source = os.path.join(self.testDataDir,'chunking', outputFn)
dest = os.path.join(self.generatedDataDir, outputFn)
os.symlink(source, dest)
@fileLinker
def makeMissingPbiBam(self, outputFn):
source = os.path.join(self.testDataDir, 'phi29.bam')
dest = os.path.join(self.generatedDataDir, outputFn)
os.symlink(source, dest)
@fileMaker
def makeChunkingXml(self, outputFn):
if outputFn == 'chunking_emptyfilters.subreadset.xml':
removeFiltersNode = False
else:
removeFiltersNode = True
self.editChunkingXml(outputFn, removeFiltersNode)
@fileMaker
def makeNormalFasta(self, outputFn):
content = ">1\n" + fastaSeq_1 + "\n>2\n" + fastaSeq_2 + "\n>3\n" + fastaSeq_3
dest = os.path.join(self.generatedDataDir, outputFn)
with open(outputFn, 'w') as fasta_out:
fasta_out.write(content)
@fileMaker
def makeNormalFastq(self, outputFn):
content = ("@1\n" + fastqSeq_1 + "\n+\n" + fastqQuals_1 + "\n" +
"@2\n" + fastqSeq_2 + "\n+\n" + fastqQuals_2 + "\n" +
"@3\n" + fastqSeq_3 + "\n+\n" + fastqQuals_3 + "\n")
dest = os.path.join(self.generatedDataDir, outputFn)
with open(outputFn, 'w') as fastq_out:
fastq_out.write(content)
@fileMaker
def makeTruncatedBam(self, outputFn):
source = os.path.join(self.testDataDir, 'phi29.bam')
dest = os.path.join(self.generatedDataDir, outputFn)
shutil.copyfile(source, dest)
with open(dest, 'r+b') as in_file:
in_file.truncate(200)
# main entry point
def generate(self):
# skip file if it exists
os.chdir(self.generatedDataDir)
filenames = list(self.outputFiles.keys())
for file in filenames:
if os.path.exists(file) :
del self.outputFiles[file]
# skip symlink if it exists
symlinks = list(self.outputSymlinks.keys())
for link in symlinks:
if os.path.lexists(link):
del self.outputSymlinks[link]
# only print message & run makers, if any files/symlinks to be created
# else silent success
if self.outputFiles or self.outputSymlinks:
print('Generating test data in %s ' % self.generatedDataDir)
for file, func in list(self.outputFiles.items()):
func(file)
for link, func in list(self.outputSymlinks.items()):
func(link)
# script entry point
if __name__ == '__main__':
g = TestDataGenerator(sys.argv[1], sys.argv[2])
g.generate()
|