File: neighbors.py

package info (click to toggle)
python-sqt 0.8.0-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 5,964; sh: 38; makefile: 10
file content (81 lines) | stat: -rw-r--r-- 2,380 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env python3
"""
Enumerate neighboring sequences

Read in a FASTA file and systematically mutate every single base, thus
enumerating all "neighboring" sequences at Hamming distance 1.

The resulting sequences are written to standard output.
"""
import sys
from sqt import HelpfulArgumentParser
from sqt.io.fasta import FastaReader, FastaWriter
from sqt.dna import mutate
from argparse import ArgumentTypeError

__author__ = "Marcel Martin"


def replacement_table(order, n):
	"""
	Return a dict that describes how to replace each base and in which order.

	{'A': 'CGT', 'G': 'TAC', 'C': 'GTA', 'T': 'ACG'}
	The first entry means:
	If there is an A in the original sequence, replace it by C, then G, then T.
	"""
	order += order  # makes indexing easier
	replace = dict()
	for i, c in enumerate('ACGT'):
		r = order[i]
		for d in order[i+1:]:
			if d not in r and d != c:
				r += d
		replace[c] = r[:n]
	return replace


def neighbors(record, table):
	"""
	Enumerate neighbors
	"""
	for i in range(len(record)):
		s = record.sequence
		c = s[i].upper()
		for new_c in table[c]:
			neighbor = record[:]
			neighbor.name += '_mut{}{}'.format(i+1, new_c)
			neighbor.sequence = s[:i] + new_c + s[i+1:]
			yield neighbor


def four_bases(s):
	if not len(s) == 4:
		raise ArgumentTypeError("String of length 4 expected")
	if not set(s) <= set('ACGT'):
		raise ArgumentTypeError("String consisting of characters A, C, G, T expected")
	return s


def main():
	parser = HelpfulArgumentParser(description=__doc__)
	parser.add_argument("--acgt", type=four_bases, default='CGTA',
		help="Order of bases replacement. Default: %(default)s")
	parser.add_argument("-n", default=3, type=int, choices=range(1, 4),
		help="Change each individual base N times. Default: %(default)s")
	parser.add_argument("--exclude-self", dest='include_self', default=True,
		action='store_false', help="Exclude the sequence itself from the output")
	parser.add_argument("fasta", metavar='FASTA', help="Input FASTA file")
	args = parser.parse_args()

	table = replacement_table(args.acgt, args.n)
	fasta_output = FastaWriter(sys.stdout, line_length=0)
	for record in FastaReader(args.fasta):
		if args.include_self:
			fasta_output.write(record.name, record.sequence)
		for neighbor in neighbors(record, table):
			fasta_output.write(neighbor.name, neighbor.sequence)


if __name__ == '__main__':
	main()