File: bamstats.py

package info (click to toggle)
python-sqt 0.8.0-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 5,964; sh: 38; makefile: 10
file content (202 lines) | stat: -rw-r--r-- 7,998 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/env python3
"""
Print a report about a SAM/BAM file.
"""
__author__ = 'Marcel Martin'

import sys
from collections import Counter, defaultdict
from itertools import islice
from pysam import Samfile
from .. import HelpfulArgumentParser
from ..reads import AlignedRead


def header(s):
	print(s)
	print('-' * len(s))
	print()


def print_coverage_report(aligned_read, report=sys.stdout, minimum_cover_fraction=0.01):
	"""
	Print coverage report for a single read to the file-like object 'report'.
	"""
	read_length = len(aligned_read)
	alignments = aligned_read.alignments
	print('Read {} ({:.1f}kbp):'.format(aligned_read.query_name, read_length / 1000), file=report)
	for alignment in sorted(alignments, key=lambda a: (a.query_start, a.query_end)):
		alignment_length = alignment.cigar.query_length(count_clipped=None)
		if alignment_length / read_length < minimum_cover_fraction:
			continue
		print(
			'{:9} bp'.format(alignment_length),
			'{:6.1%}'.format(alignment_length / read_length),
			'{:6} ... {:6}  '.format(alignment.query_start+1, alignment.query_end),
			'{} {:>2}:{}-{}'.format(alignment.strand,
				alignment.reference_name, alignment.pos+1, alignment.reference_end),
			file=report)
	bases = aligned_read.aligned_bases()
	print('{:.1%} aligned ({}/{})\n'.format(bases / read_length,
		bases, read_length),
		file=report)


def print_basics(aligned_reads, aligned_bases):
	header('Basic statistics')
	bases = sum(len(aligned_read) for aligned_read in aligned_reads)
	total_reads = len(aligned_reads)
	total_alignments = sum(len(aligned_read.alignments) for aligned_read in aligned_reads)
	print('Number of reads:     {:10,d}'.format(total_reads))
	#print('unfiltered+CIGAR:    {:10,d} ({:.2%})'.format(mapped_reads, mapped_reads / total_reads))
	print('Alignments per read: {:.3f} (only mapped reads)'.format(total_alignments / total_reads))  # TODO is this correct?
	print('bases:         {:15,d} ({:.2f} Gbp)'.format(bases, bases / 1E9))
	print('aligned bases: {:15,d} ({:.2f} Gbp) ({:.2%})'.format(aligned_bases, aligned_bases/1E9, aligned_bases/bases))
	print()


def print_subalignment_histogram(number_alignments):
	print('Histogram of number of subalignments')
	rest = 0
	for number, count in number_alignments.items():
		if number <= 10:
			print(' {:2} {:9}'.format(number, count))
		else:
			rest += count
	if rest > 0:
		print('>10 {:9}'.format(rest))
	print()


def print_subalignment_stats(aligned_reads, total_reads):
	header('Subalignment statistics')
	fully_aligned_95 = 0  # reads whose bases are 95% aligned within one subalignment
	fully_aligned_99 = 0  # reads whose bases are 99% aligned within one subalignment
	number_alignments = Counter()
	interesting = 0
	for aligned_read in aligned_reads:
		alignments = aligned_read.alignments
		lengths = sorted(alignment.query_length for alignment in alignments)
		refnames = set(alignment.reference_name for alignment in alignments)

		if len(lengths) >= 1 and lengths[-1] >= 0.95 * len(aligned_read):
			fully_aligned_95 += 1
		if len(lengths) >= 1 and lengths[-1] >= 0.99 * len(aligned_read):
			fully_aligned_99 += 1
		number_alignments[len(lengths)] += 1

		# is this an 'interesting' read? (arbitrary thresholds)
		if 2 <= len(lengths) <= 4 and len(set(refnames)) > 1:
			interesting += 1

	print_subalignment_histogram(number_alignments)
	print('fully aligned (95%):{:10,d} ({:.2%})'.format(fully_aligned_95, fully_aligned_95/total_reads))
	print('fully aligned (99%):{:10,d} ({:.2%})'.format(fully_aligned_99, fully_aligned_99/total_reads))
	print('no of interesting reads:', interesting)
	print()


def print_cigar_usage(counter):
	header("CIGAR operator usage")
	total_ops = sum(counter.values())
	ops = 'MIDNSHPX='
	for op_i, op in enumerate(ops):
		print("{:2} {:14,d} ({:7.2%})".format(op, counter[op_i], counter[op_i]/total_ops))
	print()


def print_reference_usage(reflengths, reference_hits, minimum_reference_length=1000):
	header('Scaffold/chromosome/references usage')

	long_refs = sum(1 for length in reflengths.values() if length >= minimum_reference_length)
	ref_hits_length = sum(reflengths[refname] for refname in reference_hits)
	total_ref_length = sum(reflengths.values())

	print('total length of references: {:,d} ({:.2f} Gbp)'.format(total_ref_length, total_ref_length/1E9))
	print('references:', len(reflengths))
	print('references hit by at least one alignment:', len(reference_hits))
	print('length of those references: {:,d} ({:.2%})'.format(ref_hits_length, ref_hits_length/total_ref_length))
	print('length of references not hit: {:,d}'.format(total_ref_length - ref_hits_length))
	#_refs = infile.references
	#assert infile.nreferences == len(_refs)
	#refname_to_length = dict(zip(_refs, infile.lengths))
	#for i in range(infile.nreferences):
		#assert refname_to_length[infile.getrname(i)] == reflengths[i]

	long_ref_hits = sum(1 for tid in reference_hits if reflengths[tid] >= minimum_reference_length)
	ref_hits_length = sum(reflengths[refname] for refname in reference_hits)
	print('references >= {} bp:'.format(minimum_reference_length), long_refs)
	print('references >= {} bp hit by at least one alignment:'.format(minimum_reference_length), long_ref_hits)


def main():
	parser = HelpfulArgumentParser(description=__doc__)
	parser.add_argument('--quality', '-q', type=int, default=0,
		help='Minimum mapping quality (default: %(default)s')
	parser.add_argument('--minimum-reference-length', metavar='N', type=int, default=0,
		help='For reference usage statistics, ignore references shorter than N.')
	parser.add_argument('--limit', metavar='N', type=int, default=None,
		help='Process only the first N entries in the input file.')
	parser.add_argument('--cover', metavar='FILE', default=None,
		help='Print report about "read coverage" (which sections are aligned) to FILE')
	parser.add_argument('--minimum-cover-fraction', metavar='FRACTION', type=float, default=0.01,
		help='Alignment must cover at least FRACTION of the read to appear in the cover report. (%(default)s)')
	parser.add_argument("bam", metavar="SAM/BAM", help="Name of a SAM or BAM file")
	args = parser.parse_args()

	# Count how often each CIGAR operator occurs
	cigar_counter = Counter()

	n_records = 0
	unmapped = 0
	unmapped_bases = 0
	aligned_reads = []
	with Samfile(args.bam) as sf:
		for record in islice(sf, 0, args.limit):
			n_records += 1
			if record.is_unmapped:
				unmapped += 1
				unmapped_bases += len(record.seq)
				continue
			if record.mapq < args.quality:
				continue
			assert record.cigar is not None
			for op_i, l in record.cigar:
				cigar_counter[op_i] += l

			if not record.is_secondary and not record.is_supplementary:
				aligned_read = AlignedRead(record, sf.getrname(record.tid))
				aligned_reads.append(aligned_read)

		reflengths = sf.lengths
		nreferences = sf.nreferences
		assert nreferences == len(reflengths)
		refnames_map = { tid: sf.getrname(tid) for tid in range(nreferences) }
		reference_lengths = dict(zip(sf.references, sf.lengths))

	total_aligned_bases = 0
	reference_hits = defaultdict(int)
	for aligned_read in aligned_reads:
		total_aligned_bases += aligned_read.aligned_bases()
		for alignment in aligned_read.alignments:
			reference_hits[alignment.reference_name] += 1

	if args.cover is not None:
		with open(args.cover, 'wt') as cover:
			for aligned_read in aligned_reads:
				print_coverage_report(aligned_read, cover, args.minimum_cover_fraction)

	header('All entries in input file')
	print('Total entries:   {:10,d}'.format(n_records))
	print('Unmapped:        {:10,d}'.format(unmapped))
	print('Unmapped bases:  {:10,d}'.format(unmapped_bases))
	print()

	print_basics(aligned_reads, total_aligned_bases)
	print_subalignment_stats(aligned_reads, len(aligned_reads))  # TODO is len(aligned_reads) actually the number of unique reads?
	print_cigar_usage(cigar_counter)
	print_reference_usage(reference_lengths, reference_hits, minimum_reference_length=args.minimum_reference_length)


if __name__ == '__main__':
	main()