File: summarizeModifications.py

package info (click to toggle)
kineticstools 0.6.1%2Bgit20220223.1326a4d%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 22,188 kB
  • sloc: python: 3,508; makefile: 200; ansic: 104; sh: 55; xml: 19
file content (162 lines) | stat: -rw-r--r-- 5,951 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3

"""
Summarizes kinetic modifications in the alignment_summary.gff file.
"""

import cProfile
from itertools import groupby
import functools
import os
import logging
import sys

from pbcommand.cli import get_default_argparser_with_base_opts, pacbio_args_runner
from pbcommand.utils import setup_log
from pbcore.io import GffReader, Gff3Record

# Version info...
__version__ = "1.0"


class Constants(object):
    TOOL_ID = "kinetics_tools.tasks.summarize_modifications"
    DRIVER_EXE = "python -m kineticsTools.summarizeModifications --resolved-tool-contract"


class ModificationSummary(object):
    def __init__(self, modifications, alignmentSummary, outfile):
        self.modifications = modifications
        self.alignmentSummary = alignmentSummary
        self.outfile = outfile

    def run(self, profile=False):
        self.knownModificationEvents = ["modified_base", "m6A", "m4C", "m5C"]
        if profile:
            cProfile.runctx("self._mainLoop()",
                            globals=globals(),
                            locals=locals(),
                            filename="profile.out")
            return 0
        else:
            return self._mainLoop()

    def countModificationTypes(self, mods):
        mods = sorted(mods, key=lambda x: x["type"])

        counts = dict([(x, 0) for x in self.knownModificationEvents])
        for k, g in groupby(mods, lambda x: x["type"]):
            counts[k] = len(list(g))

        return counts

    def _mainLoop(self):

        # Read in the existing modifications.gff
        modReader = GffReader(self.modifications)

        headerString = ",".join(
            ['"' + x + '"' for x in self.knownModificationEvents])

        # Set up some additional headers to be injected
        headers = [
            ('source', 'kineticModificationCaller 1.3.3'),
            ('source-commandline', " ".join(sys.argv)),
            ('attribute-description',
             'modsfwd - count of detected DNA modifications on forward strand by modification event type'),
            ('attribute-description',
             'modsrev - count of detected DNA modifications on reverse strand by modification event type'),
            ('region-modsfwd', headerString),
            ('region-modsfwd', headerString)
        ]

        hitsByEvent = dict([(x, []) for x in self.knownModificationEvents])

        # Get modification calls
        hits = [{"pos": x.start, "strand": x.strand, "seqid": x.seqid, "type": x.type}
                for x in modReader if x.type in self.knownModificationEvents]

        self.seqMap = {}
        inHeader = True

        # Loop through
        with open(self.alignmentSummary) as summaryFile:
            with open(self.outfile, "w") as summaryWriter:
                for line in summaryFile:
                    # Pass any metadata line straight through
                    if line[0] == "#":

                        # Parse headers
                        splitFields = line.replace('#', '').split(' ')
                        field = splitFields[0]
                        value = " ".join(splitFields[1:])
                        if field == 'sequence-header':
                            [internalTag, delim,
                                externalTag] = value.strip().partition(' ')
                            self.seqMap[internalTag] = externalTag
                        print(line.strip(), file=summaryWriter)
                        continue

                    if inHeader:
                        # We are at the end of the header -- write the
                        # tool-specific headers
                        for field in headers:
                            print(("##%s %s" % field), file=summaryWriter)
                        inHeader = False

                    # Parse the line
                    rec = Gff3Record.fromString(line)

                    if rec.type == 'region':
                        # Get the hits in this interval, add them to the gff
                        # record
                        intervalHits = [h for h in hits if rec.start <=
                                        h['pos'] <= rec.end and rec.seqid == h['seqid']]

                        cFwd = self.countModificationTypes(
                            [h for h in intervalHits if h['strand'] == '+'])
                        cRev = self.countModificationTypes(
                            [h for h in intervalHits if h['strand'] == '-'])

                        rec.modsfwd = ",".join(  # pylint: disable=assigning-non-slot
                            [str(cFwd[x]) for x in self.knownModificationEvents])
                        rec.modsrev = ",".join(  # pylint: disable=assigning-non-slot
                            [str(cRev[x]) for x in self.knownModificationEvents])

                        print(str(rec), file=summaryWriter)
        return 0


def args_runner(args):
    return ModificationSummary(
        modifications=args.modifications,
        alignmentSummary=args.alignmentSummary,
        outfile=args.gff_out).run()


def get_parser():
    p = get_default_argparser_with_base_opts(
        version=__version__,
        description=__doc__,
        default_level="INFO")
    p.add_argument("modifications",
                   help="Base modification GFF file")
    p.add_argument("alignmentSummary", help="Alignment summary GFF")
    p.add_argument("gff_out",
                   help="Coverage summary for regions (bins) spanning the reference with basemod results for each region")
    return p


def main(argv=sys.argv):
    setup_log_ = functools.partial(setup_log,
                                   str_formatter='%(asctime)s [%(levelname)s] %(message)s')
    return pacbio_args_runner(
        argv=argv[1:],
        parser=get_parser(),
        args_runner_func=args_runner,
        alog=logging.getLogger(__name__),
        setup_log_func=setup_log_)


if __name__ == "__main__":
    main()