File: StandardizeVCF.py

package info (click to toggle)
libgoby-java 3.3.1%2Bdfsg2-11
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 58,108 kB
  • sloc: java: 78,105; cpp: 5,011; xml: 3,170; python: 2,108; sh: 1,575; ansic: 277; makefile: 114
file content (83 lines) | stat: -rw-r--r-- 3,705 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import argparse
import csv


def _generate_reduced_vcf(input_path, output_path, chromosome):
    if chromosome is not None:
        chromosomes_to_write = set(chromosome)
    else:
        chromosomes_to_write = set(["chr{}".format(n) for n in range(1, 23)] + ["chrX"])
    with open(input_path, "r") as vcf_file, open(output_path, "w") as vcf_out_file:
        vcf_out_fields = ["CHROM", "POS", "ID", "REF", "ALT", "QUAL", "INFO", "FORMAT", "vcf_reduced"]
        vcf_out_writer = csv.DictWriter(vcf_out_file, fieldnames=vcf_out_fields, delimiter="\t", lineterminator="\n")
        vcf_out_file.write("##fileFormat=VCFv4.1\n")
        vcf_out_file.write("##originalFile={}\n".format(input_path))
        vcf_out_file.write("##FORMAT=<ID=GT,Number=1,Type=String,Description=\"Genotype\">\n")
        vcf_out_file.write("#{}\n".format("\t".join(vcf_out_fields)))
        vcf_reader = None
        fields = None
        for line in vcf_file:
            if line.startswith("##"):
                continue
            elif line.startswith("#"):
                if vcf_reader is None:
                    fields = line[1:].split("\t")
                    vcf_reader = csv.DictReader(vcf_file, fieldnames=fields, delimiter="\t", lineterminator="\n")
                    break
                else:
                    raise Exception("Should have encountered header already")
            else:
                raise Exception("Should only be past header when using vcf_reader")
        if vcf_reader is not None:
            for row in vcf_reader:
                if row["CHROM"] in chromosomes_to_write:
                    format_fields = row["FORMAT"].split(":")
                    values = row[fields[-1]].split(":")
                    gt = dict(zip(format_fields, values))["GT"]
                    if "|" in gt:
                        gt = gt.split("|")
                    elif "/" in gt:
                        gt = gt.split("/")
                    else:
                        gt = [gt]
                    try:
                        gt = map(int, gt)
                    except ValueError:
                        raise Exception("Unknown GT format")
                    gt = sorted(gt)
                    if len(gt) == 1:
                        gt = [gt[0], gt[0]]
                    if row["ALT"] != ".":
                        alt_list = sorted(row["ALT"].split(","))
                        alts = ",".join(alt_list)
                    else:
                        alts = "."
                    out_row = {
                        "CHROM": row["CHROM"],
                        "POS": row["POS"],
                        "ID": ".",
                        "REF": row["REF"],
                        "ALT": alts,
                        "QUAL": ".",
                        "INFO": ".",
                        "FORMAT": "GT",
                    }
                    gt = "/".join(map(str, gt))
                    out_row["vcf_reduced"] = gt
                    vcf_out_writer.writerow(out_row)
        else:
            raise Exception("vcf_reader shouldn't be none")


def main(args):
    _generate_reduced_vcf(args.input, args.output, args.chromosome)


if __name__ == "__main__":
    arg_parser = argparse.ArgumentParser()
    arg_parser.add_argument("-i", "--input", type=str, required=True, help="Input VCF file")
    arg_parser.add_argument("-o", "--output", type=str, required=True, help="Output reduced VCF file")
    arg_parser.add_argument("-c", "--chromosome", nargs="*", help="Chromosomes to generate VCFs for",
                            choices=["chr{}".format(n) for n in range(1, 23)] + ["chrX"])
    args_parsed = arg_parser.parse_args()
    main(args_parsed)