1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
# Author: Lance Hepler
from __future__ import absolute_import
from __future__ import print_function
"""
I/O support for VCF4 files.
The specification for the VCF4 format is available at:
https://samtools.github.io/hts-specs
"""
from collections import OrderedDict
from functools import total_ordering
__all__ = ["Vcf4Record"]
def _int_float_or_string(s):
try:
return int(s)
except ValueError:
pass
try:
return float(s)
except ValueError:
pass
return str(s)
def _fmt(x):
if isinstance(x, float):
return "{0:.3g}".format(x)
return x
def _empty_then(xs):
if not xs:
return
yield ""
for x in xs:
yield x
@total_ordering
class Vcf4Record(object):
"""
Class for VCF record, providing uniform access to standard
VCF fields and attributes.
.. doctest::
>>> from pbcore.io import Vcf4Record
>>> rec1 = Vcf4Record("ecoliK12_pbi_March2013", \
84, ".", "TG", "T", 48, "PASS", [("DP", 53)])
>>> str(rec1)
'ecoliK12_pbi_March2013\\t84\\t.\\tTG\\tT\\t48\\tPASS\\tDP=53'
>>> rec2 = Vcf4Record.fromString(str(rec1))
>>> rec1 == rec2
True
>>> rec1.pos
84
>>> rec1.filter
'PASS'
"""
__slots__ = ("chrom", "pos", "id", "ref", "alt", "qual", "filter", "info", "fields")
def __init__(self, chrom, pos, id, ref, alt, qual, filt, info, *fields):
self.chrom = chrom
self.pos = pos
self.id = id
self.ref = ref
self.alt = alt
self.qual = qual
self.filter = filt
self.info = OrderedDict(info)
self.fields = fields
@staticmethod
def fromString(s):
try:
fields = s.strip().split('\t')
fields[1] = int(fields[1])
fields[5] = _int_float_or_string(fields[5])
fields[7] = OrderedDict((k.strip(), _int_float_or_string(v.strip()))
for k, v in (x.strip().split('=')
for x in fields[7].split(';')))
return Vcf4Record(*fields)
except TypeError:
raise ValueError("Could not interpret string as a Vcf4Record: '{0}'".format(s))
def __lt__(self, other):
return (self.chrom, self.pos) < (other.chrom, other.pos)
def __eq__(self, other):
return (self.chrom == other.chrom and self.pos == other.pos and self.id == other.id and \
self.ref == other.ref and self.alt == other.alt and self.qual == other.qual and \
self.filter == other.filter and self.info == other.info and \
self.fields == other.fields)
def __str__(self):
return "{chrom}\t{pos}\t{id}\t{ref}\t{alt}\t{qual}\t{filter}\t{info}{fields}".format(
chrom=self.chrom, pos=self.pos, id=self.id,
ref=self.ref, alt=self.alt, qual=_fmt(self.qual),
filter=self.filter,
info=";".join("{0}={1}".format(k, _fmt(v)) for k, v in self.info.iteritems()),
fields="\t".join(_empty_then(self.fields)))
def merge_vcfs_sorted(vcf_files, output_file_name):
"""
Utility function to combine N (>=1) VCF files, with records
sorted by contig then contig position. This assumes each file is already
sorted, and forms a contiguous set of records.
"""
# get the headers and the first record of each file, so we can compare
meta = OrderedDict([])
hdr = OrderedDict([])
fst_recs = []
for f in vcf_files:
with open(f) as h:
for line in h:
line = line.strip()
if not line:
continue
elif line[:2] == "##":
meta[line] = ()
elif line[:6] == "#CHROM":
hdr.update((h, ()) for h in line.lstrip("#").split("\t"))
else:
fst_recs.append((Vcf4Record.fromString(line), f))
break
sorted_files = sorted(fst_recs, key=lambda x: x[0])
nrec = 0
with open(output_file_name, "w") as oh:
for m, _ in meta.iteritems():
print(m, file=oh)
print("#{0}".format("\t".join(h for h, _ in hdr.iteritems())), file=oh)
for _, f in sorted_files:
with open(f) as h:
for line in h:
line = line.strip()
if not line or line[0] == "#":
continue
else:
print(line, file=oh)
nrec += 1
return nrec
|