File: PileupTest.py

package info (click to toggle)
ngs-sdk 2.9.3-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 8,216 kB
  • sloc: cpp: 20,310; java: 5,060; perl: 3,895; python: 1,452; makefile: 691; xml: 233; sh: 70
file content (112 lines) | stat: -rw-r--r-- 4,673 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#===========================================================================
#
#                           PUBLIC DOMAIN NOTICE
#              National Center for Biotechnology Information
#
# This software/database is a "United States Government Work" under the
# terms of the United States Copyright Act.  It was written as part of
# the author's official duties as a United States Government employee and
# thus cannot be copyrighted.  This software/database is freely available
# to the public for use. The National Library of Medicine and the U.S.
# Government have not placed any restriction on its use or reproduction.
#
# Although all reasonable efforts have been taken to ensure the accuracy
# and reliability of the software and data, the NLM and the U.S.
# Government do not and cannot warrant the performance or results that
# may be obtained by using this software or data. The NLM and the U.S.
# Government disclaim all warranties, express or implied, including
# warranties of performance, merchantability or fitness for any particular
# purpose.
#
# Please cite the author in any work or product based on this material.
#
#===========================================================================
#

import sys
import traceback

from ngs import NGS
from ngs.ErrorMsg import ErrorMsg
from ngs.ReadCollection import ReadCollection
from ngs.Reference import Reference
from ngs.PileupEvent import PileupEvent


def run(acc, refName, start, stop):
    # open requested accession using SRA implementation of the API
    with NGS.openReadCollection(acc) as run:
        run_name = run.getName()
    
        # get requested reference
        with run.getReference(refName) as ref:
            # start iterator on requested range
            with ref.getPileupSlice(start-1, stop-start+1) as it:
                i = 0
                while it.nextPileup():
                    qual = ""
                    base = ""
                
                    line = "{}\t{}\t{}\t{}".format(
                        it.getReferenceSpec(),
                        it.getReferencePosition()+1,
                        it.getReferenceBase(),
                        it.getPileupDepth(),
                        )
                    while it.nextPileupEvent():
                        e = it.getEventType()
                        if (e & PileupEvent.alignment_start ) != 0:
                            base = base + '^'
                            base = base + chr(it.getMappingQuality() + 33)

                        if ( e & PileupEvent.insertion ) != 0:
                            base = base + '+'
                            ibases= it.getInsertionBases()
                            c = len(ibases)
                            base = base + str(c)

                            if ( e & PileupEvent.alignment_minus_strand ) != 0:
                                base = base + ibases.lower()
                            else:
                                base = base + ibases

                        evt = e & 7

                        if ( e & PileupEvent.alignment_minus_strand ) != 0:
                            if evt == PileupEvent.deletion:
                                base = base + '<'
                            elif evt == PileupEvent.match:
                                base = base + ','
                            elif evt == PileupEvent.mismatch:
                                base = base + str(it.getAlignmentBase()).lower()
                        else:
                            if evt == PileupEvent.deletion:
                                base = base + '>'
                            elif evt == PileupEvent.match:
                                base = base + '.'
                            elif evt == PileupEvent.mismatch:
                                base = base + str(it.getAlignmentBase()).upper()
                        
                        if ( e & PileupEvent.alignment_stop ) != 0:
                            base = base + '$'
                        
                        qual = qual + it.getAlignmentQuality()
                        
                    i += 1
                    print ("{}\t{}\t{}".format(line, base, qual))
                print ("Read {} pileups for {}".format(i, run_name))

if len(sys.argv) != 5:
    print ("Usage: PileupTest accession reference start stop\n")
    exit(1)
else:
    try:
        run(sys.argv[1], sys.argv[2], int(sys.argv[3]), int(sys.argv[4]))
    except ErrorMsg as x:
        print (x)
        traceback.print_exc()
        # x.printStackTrace - not implemented
        exit(1)
    except BaseException as x:
        traceback.print_exc()
        exit(1)