1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
'''
Creation of a FA file from a compacted fact int file.
@author pierre peterlongo pierre.peterlongo@inria.fr
@deprecated
'''
__author__ = "Pierre Peterlongo"
__email__ = "pierre.peterlongo@inria.fr"
import sys
import K3000_common as kc
import argparse
from collections import defaultdict
rev_sign = lambda s: "+" if s == "-" else "-"
def store_variant_overlap_length(gfa_file_name):
'''
Given a graph_final format where overlaps are stored as:
L 0 + 400 + 43M OFL:i:1
Returns a dictionary:
key = source_fact_id, value = dictionary:
key = target_fact_id, value = overlap length (nucleotides)
'''
overlap_lengths = defaultdict(dict)
with open(gfa_file_name) as gfa_file:
while True:
line = gfa_file.readline()
if not line: break
if line[0] != 'L': continue
sline = line.strip().split()
OL = int(sline[5][:-1])
if OL<1: continue
# add the Overlap Length (OL)
overlap_lengths[sline[2]+sline[1]][sline[4]+sline[3]] = OL
# add it for the reverse
overlap_lengths[rev_sign(sline[4])+sline[3]][rev_sign(sline[2])+sline[1]] = OL
return overlap_lengths
def store_fact_sequence(gfa_file_name):
'''
Given a graph_final format where facts are stored as:
S 1 gtGCAATAAGAATTGTCTTTCTTATAATAATTGTCCAACTTAGgGTCAATTTCTGTACaaacaaCACCATCCAAt AS:-577h;-977l;1354l; SP:0_44;11_64;32_75; BP:0_41;-26_41;-25_41; EV:0 FC:i:52 min:17 max:410 mean:180.0 AC:410;17;113;
Returns a dictionary:
key = fact_id, value = sequence:
'''
fact_sequence = defaultdict(dict)
with open(gfa_file_name) as gfa_file:
while True:
line = gfa_file.readline()
if not line: break
if line[0] != 'S': continue
sline = line.strip().split()
fact_sequence[sline[1]] = sline[2]
return fact_sequence
def get_sequence(facts_sequence, fact_id):
'''
fact id is given by \'+12\' or \'-12\'
fact sequences are indexed by \'12\' only.
if sign is \'+\' returns the corresponding sequence
else returns its reverse complement
'''
forward_sequence = facts_sequence[fact_id[1:]]
if fact_id[0] == '+': return forward_sequence
else: return kc.get_reverse_complement(forward_sequence)
def formatpathid (pathid: str) -> str:
'''
given a path id with format mp (eg. m2093)
transformt it to format +- (eg. -2093)
'''
if pathid[0] == 'm': return '-'+pathid[1:]
if pathid[0] == 'p': return '+'+pathid[1:]
raise ValueError
def generate_fa(gfa_file_name, path_facts_file_name):
'''
paths file contains lines as: haplotypeID ccID fact1;fact2;fact3 abundance
eg.
1 19 m463;p3596;m2093;p2782 583.0
we transforme them into a fasta file:
>CC_id|Path_id|abundance|Optional stuffs
ACGT...
'''
overlap_lengths = store_variant_overlap_length(gfa_file_name)
facts_sequence = store_fact_sequence(gfa_file_name)
with open(path_facts_file_name) as path_facts_file:
while True:
line = path_facts_file.readline()
if not line: break
sline = line.strip().split()
haplotype_id = sline[0]
cc_id = sline[1]
facts_path = sline[2]
abundance = sline[3]
facts_path = facts_path.strip(";") # just in case
previous_fact = formatpathid(facts_path.split(";")[0])
sequence = get_sequence(facts_sequence, previous_fact)
for current_fact in facts_path.split(";")[1:]:
current_fact = formatpathid(current_fact)
# TODO: remove those asserts when clearly tested.
assert (previous_fact in overlap_lengths)
assert (current_fact in overlap_lengths[previous_fact])
OL = overlap_lengths[previous_fact][current_fact]
assert(kc.hamming_near_perfect (get_sequence(facts_sequence, previous_fact)[-OL:], get_sequence(facts_sequence, current_fact)[:OL]))
# assert(get_sequence(facts_sequence, previous_fact)[-OL:].upper() == get_sequence(facts_sequence, current_fact)[:OL].upper())
sequence += get_sequence(facts_sequence, current_fact)[OL:]
previous_fact = current_fact
print(f">{cc_id}|{haplotype_id}|{facts_path}|{abundance}\n{sequence}")
def main():
'''
Creation of a FA file from a compacted fact int file.
'''
parser = argparse.ArgumentParser(description='From phased facts to fasta file.')
parser.add_argument("--gfa", type=str,
help="File containing the gfa generated by k3000/run.sh (usually named graph_final_X.gfa, with X the read set id)",
required=True)
parser.add_argument("--paths", type=str,
help="File containing phased facts (format: \"haplotypeID ccID fact1;fact2;fact3\"",
required=True)
args = parser.parse_args()
gfa_file_name = str(args.gfa)
path_facts_file_name = str(args.paths)
generate_fa(gfa_file_name, path_facts_file_name)
if __name__ == "__main__":
main()
|