File: maf_tile.py

package info (click to toggle)
python-bx 0.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,000 kB
  • sloc: python: 17,136; ansic: 2,326; makefile: 24; sh: 8
file content (136 lines) | stat: -rwxr-xr-x 4,298 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/python3

"""
'Tile' the blocks of a maf file over each of a set of intervals. The
highest scoring block that covers any part of a region will be used, and
pieces not covered by any block filled with "-" or optionally "*". The list
of species to tile is specified by `tree` (either a tree or just a comma
separated list). The `seq_db` is a lookup table mapping chromosome names
to nib file for filling in the reference species. Maf files must be indexed.

NOTE: See maf_tile_2.py for a more sophisticated version of this program, I
      think this one will be eliminated in the future.

usage: %prog tree maf_files...
    -m, --missingData: Inserts wildcards for missing block rows instead of '-'
"""

import sys

import bx.align as align
import bx.align.maf
import bx.seq.nib
from bx.cookbook import doc_optparse

tree_tx = str.maketrans("(),", "   ")


def main():
    options, args = doc_optparse.parse(__doc__)
    try:
        sources = args[0].translate(tree_tx).split()
        seq_db = load_seq_db(args[1])
        index = bx.align.maf.MultiIndexed(args[2:])

        out = bx.align.maf.Writer(sys.stdout)
        missing_data = bool(options.missingData)
    except Exception:
        doc_optparse.exception()

    for line in sys.stdin:
        ref_src, start, end = line.split()[0:3]
        do_interval(sources, index, out, ref_src, int(start), int(end), seq_db, missing_data)

    out.close()


def load_seq_db(fname):
    db = {}
    for line in open(fname):
        fields = line.split(",")
        src = fields[1] + "." + fields[2]
        seq = fields[4]
        db[src] = seq.strip()
    return db


def do_interval(sources, index, out, ref_src, start, end, seq_db, missing_data):
    assert sources[0].split(".")[0] == ref_src.split(".")[0], "{} != {}".format(
        sources[0].split(".")[0], ref_src.split(".")[0]
    )

    base_len = end - start

    blocks = index.get(ref_src, start, end)
    # From low to high score
    blocks.sort(key=lambda _: _.score)

    mask = [-1] * base_len

    ref_src_size = None
    for i, block in enumerate(blocks):
        ref = block.get_component_by_src_start(ref_src)
        ref_src_size = ref.src_size
        assert ref.strand == "+"
        slice_start = max(start, ref.start)
        slice_end = min(end, ref.end)
        for j in range(slice_start, slice_end):
            mask[j - start] = i

    tiled = []
    for i in range(len(sources)):
        tiled.append([])

    for ss, ee, index in intervals_from_mask(mask):
        if index < 0:
            tiled[0].append(bx.seq.nib.NibFile(open(seq_db[ref_src])).get(start + ss, ee - ss))
            for row in tiled[1:]:
                if missing_data:
                    row.append("*" * (ee - ss))
                else:
                    row.append("-" * (ee - ss))
        else:
            slice_start = start + ss
            slice_end = start + ee
            block = blocks[index]
            ref = block.get_component_by_src_start(ref_src)
            sliced = block.slice_by_component(ref, slice_start, slice_end)
            sliced = sliced.limit_to_species(sources)
            sliced.remove_all_gap_columns()
            for i, src in enumerate(sources):
                comp = sliced.get_component_by_src_start(src)
                if comp:
                    tiled[i].append(comp.text)
                else:
                    if missing_data:
                        tiled[i].append("*" * sliced.text_size)
                    else:
                        tiled[i].append("-" * sliced.text_size)

    a = align.Alignment()
    for i, name in enumerate(sources):
        text = "".join(tiled[i])
        size = len(text) - text.count("-")
        if i == 0:
            if ref_src_size is None:
                ref_src_size = bx.seq.nib.NibFile(open(seq_db[ref_src])).length
            c = align.Component(ref_src, start, end - start, "+", ref_src_size, text)
        else:
            c = align.Component(name + ".fake", 0, size, "?", size, text)
        a.add_component(c)

    out.write(a)


def intervals_from_mask(mask):
    start = 0
    last = mask[0]
    for i in range(1, len(mask)):
        if mask[i] != last:
            yield start, i, last
            start = i
            last = mask[i]
    yield start, len(mask), last


main()