1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
#(c) 2013-2014 by Authors
#This file is a part of Ragout program.
#Released under the BSD license (see LICENSE file)
"""
Some helping function for alignment manipulations
"""
from __future__ import absolute_import
from collections import namedtuple, defaultdict
from six.moves import filter
AlignmentColumn = namedtuple("AlignmentColumn", ["ref", "qry"])
AlignmentRow = namedtuple("AlignmentRow", ["start", "end", "strand",
"seq_len", "seq_id"])
def group_by_chr(alignment):
by_chr = defaultdict(list)
for entry in alignment:
by_chr[entry.ref.seq_id].append(entry)
for chr_id in by_chr:
by_chr[chr_id].sort(key=lambda e: e.ref.start)
return by_chr
def join_collinear(alignment):
new_entries = []
def append_entry(start_entry, last_entry):
ref_row = AlignmentRow(start_entry.ref.start, last_entry.ref.end,
start_entry.ref.strand, start_entry.ref.seq_len,
start_entry.ref.seq_id)
qry_left, qry_right = sorted([start_entry.qry, last_entry.qry],
key=lambda r: r.start)
qry_row = AlignmentRow(qry_left.start, qry_right.end,
qry_left.strand, qry_left.seq_len,
qry_left.seq_id)
new_entries.append(AlignmentColumn(ref_row, qry_row))
by_chr = group_by_chr(alignment)
for chr_id in by_chr:
by_chr[chr_id].sort(key=lambda e: e.ref.start)
start_entry = None
last_entry = None
for entry in by_chr[chr_id]:
if not start_entry:
start_entry = entry
last_entry = entry
continue
#checking for connection consistency
ref_type = last_entry.ref.strand * entry.ref.strand
qry_left, qry_right = sorted([last_entry.qry, entry.qry],
key=lambda r: r.start)
qry_type = qry_left.strand * qry_right.strand
###
if (entry.qry.seq_id != last_entry.qry.seq_id or
ref_type != qry_type):
append_entry(start_entry, last_entry)
start_entry = entry
last_entry = entry
if start_entry:
append_entry(start_entry, last_entry)
return new_entries
def aln_len(row):
assert row.end >= row.start
return row.end - row.start
def filter_by_coverage(alignment, threshold):
by_name = defaultdict(list)
for entry in alignment:
by_name[entry.qry.seq_id].append(entry)
for name in by_name:
by_name[name].sort(key=lambda e: aln_len(e.qry), reverse=True)
len_filter = (lambda e: aln_len(e.qry) > threshold *
aln_len(by_name[name][0].qry))
by_name[name] = list(filter(len_filter, by_name[name]))
filtered_alignment = []
for ent_lst in by_name.values():
filtered_alignment.extend(ent_lst)
return filtered_alignment
class Hit:
def __init__(self, index, chr, coord, sign):
self.index = index
self.chr = chr
self.coord = coord
self.sign = sign
def __str__(self):
return ("+" if self.sign > 0 else "-") + str(self.index) + " : " + str(self.chr)
def get_order(alignment):
chr_len = defaultdict(int)
contig_len = defaultdict(int)
for entry in alignment:
contig_len[entry.qry.seq_id] = max(entry.qry.end,
contig_len[entry.qry.seq_id])
chr_len[entry.ref.seq_id] = max(entry.ref.end,
chr_len[entry.ref.seq_id])
by_chr = group_by_chr(alignment)
entry_ord = defaultdict(list)
contig_pos = 1
prev_start = None
for chr_id, alignment in by_chr.items():
for e in alignment:
if prev_start is not None and e.ref.start > prev_start:
contig_pos += 1
prev_start = e.ref.start
sign = e.ref.strand * e.qry.strand
entry_ord[e.qry.seq_id].append(Hit(contig_pos, chr_id,
e.ref.start, sign))
return entry_ord, chr_len, contig_len
|