File: generate_data.py

package info (click to toggle)
pbbam 2.4.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 14,144 kB
  • sloc: cpp: 60,214; xml: 2,908; ansic: 660; sh: 275; python: 203; makefile: 187
file content (185 lines) | stat: -rwxr-xr-x 7,776 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/python3

import os, shutil, sys
from io import StringIO

# FASTA generation
fastaSeq_1 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAAC
AACGCAGCTCCGCCCTCGCGGTGCTCTCCGGGTCTGTGCTGAGGAGAACGCAACTCCGCCGGCGCAGGCG"""

fastaSeq_2 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAAC
AACGCAGCTCCGCCCTCGCGGTGCTCTCCGGGTCTGTGCTGAGGAGAACGCAAC"""

fastaSeq_3 = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAAC
ACCCTAACCCCAACCCCAACCCCAACCCCAACCCCAACCCCAACCCTAACCCCTAACCCTAACCCT"""

# FASTQ generation

fastqSeq_1   = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACAACGCAGCTCCGCCCTCGCGGTGCTCTCCGGGTCTGTGCTGAGGAGAACGCAACTCCGCCGGCGCAGGCG"""
fastqQuals_1 = """[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[["""

fastqSeq_2   = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACAACGCAGCTCCGCCCTCGCGGTGCTCTCCGGGTCTGTGCTGAGGAGAACGCAAC"""
fastqQuals_2 = """[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[["""

fastqSeq_3   = """TAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACCCTAACACCCTAACCCCAACCCCAACCCCAACCCCAACCCCAACCCCAACCCTAACCCCTAACCCTAACCCT"""
fastqQuals_3 = """]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]"""


# file creation decorator
def fileMaker(func):
    def inner(*args, **kwargs):
        print(" - Creating file: %s..." % args[1], end='')
        sys.stdout.flush()
        retval = func(*args)
        print("done.")
        sys.stdout.flush()
        return retval
    return inner

# symlink creation decorator
def fileLinker(func):
    def inner(*args, **kwargs):
        print(" - Creating symlink: %s..." % args[1], end='')
        sys.stdout.flush()
        retval = func(*args)
        print("done.")
        sys.stdout.flush()
        return retval
    return inner

# return a copy of original, minues any lines that contain an entry in blacklist
def trimXmlElements(original, blacklist):
    out = StringIO()
    for line in original.splitlines():
        if all(x not in line for x in blacklist):
            out.write(line + '\n')
    result = out.getvalue()
    out.close()
    return result

class TestDataGenerator:

    def __init__(self, source, dest):

        # source/destination directories
        self.testDataDir      = source
        self.generatedDataDir = dest

        # generated output files/symlinks & 'maker' functions
        self.outputFiles = {
            'truncated.bam' : self.makeTruncatedBam,
            'chunking_emptyfilters.subreadset.xml'   : self.makeChunkingXml,
            'chunking_missingfilters.subreadset.xml' : self.makeChunkingXml,
            'normal.fa' : self.makeNormalFasta,
            'normal.fq' : self.makeNormalFastq
        }
        self.outputSymlinks = {
            'aligned.bam'      : self.makeAlignedBamCopy,
            'aligned.bam.bai'  : self.makeAlignedBamCopy,
            'aligned.bam.pbi'  : self.makeAlignedBamCopy,
            'aligned2.bam'     : self.makeAlignedBamCopy,
            'aligned2.bam.bai' : self.makeAlignedBamCopy,
            'aligned2.bam.pbi' : self.makeAlignedBamCopy,
            'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.1.subreads.bam'     : self.makeChunkingSymlink,
            'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.1.subreads.bam.pbi' : self.makeChunkingSymlink,
            'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.2.subreads.bam'     : self.makeChunkingSymlink,
            'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.2.subreads.bam.pbi' : self.makeChunkingSymlink,
            'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.3.subreads.bam'     : self.makeChunkingSymlink,
            'm150404_101626_42267_c100807920800000001823174110291514_s1_p0.3.subreads.bam.pbi' : self.makeChunkingSymlink,
            'missing_pbi.bam' : self.makeMissingPbiBam,
        }

    def editChunkingXml(self, outputFn, removeFiltersNode):
        inputXmlFn  = os.path.join(self.testDataDir,'chunking','chunking.subreadset.xml')
        outputXmlFn = os.path.join(self.generatedDataDir,outputFn)

        blacklist = ['pbds:Filter>', 'pbbase:Properties>', '<pbbase:Property']
        if removeFiltersNode:
            blacklist.append('pbds:Filters>')

        inputXml = ''
        with open(inputXmlFn, 'r') as xml_infile:
            inputXml = xml_infile.read()
        outputXml = trimXmlElements(inputXml, blacklist)
        with open(outputXmlFn, 'w') as xml_outfile:
            xml_outfile.write(outputXml)

    @fileLinker
    def makeAlignedBamCopy(self, outputFn):
        source = os.path.join(self.testDataDir,outputFn)
        dest   = os.path.join(self.generatedDataDir, outputFn)
        os.symlink(source, dest)

    @fileLinker
    def makeChunkingSymlink(self, outputFn):
        source = os.path.join(self.testDataDir,'chunking', outputFn)
        dest   = os.path.join(self.generatedDataDir, outputFn)
        os.symlink(source, dest)
  
    @fileLinker
    def makeMissingPbiBam(self, outputFn):
        source = os.path.join(self.testDataDir, 'phi29.bam')
        dest   = os.path.join(self.generatedDataDir, outputFn)
        os.symlink(source, dest)

    @fileMaker
    def makeChunkingXml(self, outputFn):
        if outputFn == 'chunking_emptyfilters.subreadset.xml':
            removeFiltersNode = False
        else:
            removeFiltersNode = True
        self.editChunkingXml(outputFn, removeFiltersNode)

    @fileMaker
    def makeNormalFasta(self, outputFn):
        content = ">1\n" + fastaSeq_1 + "\n>2\n" + fastaSeq_2 + "\n>3\n" + fastaSeq_3
        dest = os.path.join(self.generatedDataDir, outputFn)
        with open(outputFn, 'w') as fasta_out:
            fasta_out.write(content)

    @fileMaker
    def makeNormalFastq(self, outputFn):
        content = ("@1\n" + fastqSeq_1 + "\n+\n" + fastqQuals_1 + "\n" +
                   "@2\n" + fastqSeq_2 + "\n+\n" + fastqQuals_2 + "\n" +
                   "@3\n" + fastqSeq_3 + "\n+\n" + fastqQuals_3 + "\n")
        dest = os.path.join(self.generatedDataDir, outputFn)
        with open(outputFn, 'w') as fastq_out:
            fastq_out.write(content)

    @fileMaker
    def makeTruncatedBam(self, outputFn):
        source = os.path.join(self.testDataDir, 'phi29.bam')
        dest   = os.path.join(self.generatedDataDir, outputFn)
        shutil.copyfile(source, dest)
        with open(dest, 'r+b') as in_file:
            in_file.truncate(200)

    # main entry point
    def generate(self):

        # skip file if it exists
        os.chdir(self.generatedDataDir)
        filenames = list(self.outputFiles.keys())
        for file in filenames:
            if os.path.exists(file) :
                del self.outputFiles[file]

        # skip symlink if it exists
        symlinks = list(self.outputSymlinks.keys())
        for link in symlinks:
            if os.path.lexists(link):
                del self.outputSymlinks[link]

        # only print message & run makers, if any files/symlinks to be created
        # else silent success
        if self.outputFiles or self.outputSymlinks:
            print('Generating test data in %s ' % self.generatedDataDir)
            for file, func in list(self.outputFiles.items()):
                func(file)
            for link, func in list(self.outputSymlinks.items()):
                func(link)

# script entry point
if __name__ == '__main__':
    g = TestDataGenerator(sys.argv[1], sys.argv[2])
    g.generate()