File: VcfIO.py

package info (click to toggle)
python-pbcore 1.6.5%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 19,168 kB
  • sloc: python: 25,497; xml: 2,846; makefile: 251; sh: 24
file content (150 lines) | stat: -rw-r--r-- 4,678 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# Author: Lance Hepler

from __future__ import absolute_import
from __future__ import print_function

"""
I/O support for VCF4 files.

The specification for the VCF4 format is available at:
    https://samtools.github.io/hts-specs
"""

from collections import OrderedDict
from functools import total_ordering

__all__ = ["Vcf4Record"]


def _int_float_or_string(s):
    try:
        return int(s)
    except ValueError:
        pass
    try:
        return float(s)
    except ValueError:
        pass
    return str(s)


def _fmt(x):
    if isinstance(x, float):
        return "{0:.3g}".format(x)
    return x


def _empty_then(xs):
    if not xs:
        return
    yield ""
    for x in xs:
        yield x


@total_ordering
class Vcf4Record(object):
    """
    Class for VCF record, providing uniform access to standard
    VCF fields and attributes.

    .. doctest::

        >>> from pbcore.io import Vcf4Record
        >>> rec1 = Vcf4Record("ecoliK12_pbi_March2013", \
                              84, ".", "TG", "T", 48, "PASS", [("DP", 53)])
        >>> str(rec1)
        'ecoliK12_pbi_March2013\\t84\\t.\\tTG\\tT\\t48\\tPASS\\tDP=53'
        >>> rec2 = Vcf4Record.fromString(str(rec1))
        >>> rec1 == rec2
        True
        >>> rec1.pos
        84
        >>> rec1.filter
        'PASS'

    """

    __slots__ = ("chrom", "pos", "id", "ref", "alt", "qual", "filter", "info", "fields")

    def __init__(self, chrom, pos, id, ref, alt, qual, filt, info, *fields):
        self.chrom = chrom
        self.pos = pos
        self.id = id
        self.ref = ref
        self.alt = alt
        self.qual = qual
        self.filter = filt
        self.info = OrderedDict(info)
        self.fields = fields

    @staticmethod
    def fromString(s):
        try:
            fields = s.strip().split('\t')
            fields[1] = int(fields[1])
            fields[5] = _int_float_or_string(fields[5])
            fields[7] = OrderedDict((k.strip(), _int_float_or_string(v.strip()))
                                    for k, v in (x.strip().split('=')
                                                for x in fields[7].split(';')))
            return Vcf4Record(*fields)
        except TypeError:
            raise ValueError("Could not interpret string as a Vcf4Record: '{0}'".format(s))

    def __lt__(self, other):
        return (self.chrom, self.pos) < (other.chrom, other.pos)

    def __eq__(self, other):
        return (self.chrom == other.chrom and self.pos == other.pos and self.id == other.id and \
                self.ref == other.ref and self.alt == other.alt and self.qual == other.qual and \
                self.filter == other.filter and self.info == other.info and \
                self.fields == other.fields)

    def __str__(self):
        return "{chrom}\t{pos}\t{id}\t{ref}\t{alt}\t{qual}\t{filter}\t{info}{fields}".format(
                chrom=self.chrom, pos=self.pos, id=self.id,
                ref=self.ref, alt=self.alt, qual=_fmt(self.qual),
                filter=self.filter,
                info=";".join("{0}={1}".format(k, _fmt(v)) for k, v in self.info.iteritems()),
                fields="\t".join(_empty_then(self.fields)))


def merge_vcfs_sorted(vcf_files, output_file_name):
    """
    Utility function to combine N (>=1) VCF files, with records
    sorted by contig then contig position. This assumes each file is already
    sorted, and forms a contiguous set of records.
    """
    # get the headers and the first record of each file, so we can compare
    meta = OrderedDict([])
    hdr = OrderedDict([])
    fst_recs = []
    for f in vcf_files:
        with open(f) as h:
            for line in h:
                line = line.strip()
                if not line:
                    continue
                elif line[:2] == "##":
                    meta[line] = ()
                elif line[:6] == "#CHROM":
                    hdr.update((h, ()) for h in line.lstrip("#").split("\t"))
                else:
                    fst_recs.append((Vcf4Record.fromString(line), f))
                    break
    sorted_files = sorted(fst_recs, key=lambda x: x[0])
    nrec = 0
    with open(output_file_name, "w") as oh:
        for m, _ in meta.iteritems():
            print(m, file=oh)
        print("#{0}".format("\t".join(h for h, _ in hdr.iteritems())), file=oh)
        for _, f in sorted_files:
            with open(f) as h:
                for line in h:
                    line = line.strip()
                    if not line or line[0] == "#":
                        continue
                    else:
                        print(line, file=oh)
                        nrec += 1
    return nrec