File: K3000_facts_to_fa.py

package info (click to toggle)
discosnp 1%3A2.6.2-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,656 kB
  • sloc: python: 5,893; sh: 2,966; cpp: 2,692; makefile: 14
file content (149 lines) | stat: -rw-r--r-- 5,378 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
'''
Creation of a FA file from a compacted fact int file. 
@author  pierre peterlongo pierre.peterlongo@inria.fr
@deprecated
'''


__author__ = "Pierre Peterlongo"
__email__ = "pierre.peterlongo@inria.fr"

import sys
import K3000_common as kc
import argparse
from collections import defaultdict

    
rev_sign = lambda s: "+" if s == "-" else "-"

def store_variant_overlap_length(gfa_file_name):
    '''
    Given a graph_final format where overlaps are stored as: 
    L	0	+	400	+	43M	OFL:i:1
    
    Returns a dictionary: 
    key = source_fact_id, value = dictionary:
        key = target_fact_id, value = overlap length (nucleotides) 
    '''
    overlap_lengths = defaultdict(dict) 
    with open(gfa_file_name) as gfa_file:
        while True:
            line = gfa_file.readline()
            if not line: break
            if line[0] != 'L': continue
            sline = line.strip().split()
            OL = int(sline[5][:-1])
            if OL<1: continue
            # add the Overlap Length (OL)
            overlap_lengths[sline[2]+sline[1]][sline[4]+sline[3]] = OL
            # add it for the reverse 
            overlap_lengths[rev_sign(sline[4])+sline[3]][rev_sign(sline[2])+sline[1]] = OL
    return overlap_lengths

def store_fact_sequence(gfa_file_name):
    '''
    Given a graph_final format where facts are stored as: 
    S       1       gtGCAATAAGAATTGTCTTTCTTATAATAATTGTCCAACTTAGgGTCAATTTCTGTACaaacaaCACCATCCAAt     AS:-577h;-977l;1354l;   SP:0_44;11_64;32_75;    BP:0_41;-26_41;-25_41;  EV:0    FC:i:52 min:17  max:410 mean:180.0      AC:410;17;113;

    
    Returns a dictionary: 
    key = fact_id, value = sequence:
    '''
    fact_sequence = defaultdict(dict) 
    with open(gfa_file_name) as gfa_file:
        while True:
            line = gfa_file.readline()
            if not line: break
            if line[0] != 'S': continue
            sline = line.strip().split()
            fact_sequence[sline[1]] = sline[2]
    return fact_sequence

def get_sequence(facts_sequence, fact_id):
    '''
    fact id is given by \'+12\' or \'-12\'
    fact sequences are indexed by \'12\' only.
    if sign is \'+\' returns the corresponding sequence
    else returns its reverse complement
    '''
    forward_sequence  = facts_sequence[fact_id[1:]]
    if fact_id[0] == '+':   return forward_sequence
    else:                   return kc.get_reverse_complement(forward_sequence)
        

def formatpathid (pathid: str) -> str:
    '''
    given a path id with format mp (eg. m2093)
    transformt it to format +- (eg. -2093)
    '''
    if pathid[0] == 'm': return '-'+pathid[1:]
    if pathid[0] == 'p': return '+'+pathid[1:]
    raise ValueError

def generate_fa(gfa_file_name, path_facts_file_name):
    '''
    paths file contains lines as: haplotypeID	ccID	fact1;fact2;fact3   abundance
    eg.
    1       19      m463;p3596;m2093;p2782  583.0
    we transforme them into a fasta file: 
    >CC_id|Path_id|abundance|Optional stuffs
    ACGT...
    '''

    overlap_lengths = store_variant_overlap_length(gfa_file_name)
    facts_sequence = store_fact_sequence(gfa_file_name)

    with open(path_facts_file_name) as path_facts_file:
        while True:
            line = path_facts_file.readline()
            if not line: break
            sline = line.strip().split()
            haplotype_id = sline[0]
            cc_id = sline[1]
            facts_path = sline[2]
            abundance  = sline[3]
            facts_path = facts_path.strip(";") # just in case
            previous_fact = formatpathid(facts_path.split(";")[0])
            sequence =  get_sequence(facts_sequence, previous_fact)
            for current_fact in facts_path.split(";")[1:]:
                current_fact = formatpathid(current_fact)
                # TODO: remove those asserts when clearly tested.
                assert (previous_fact in overlap_lengths)
                assert (current_fact in overlap_lengths[previous_fact])
                OL = overlap_lengths[previous_fact][current_fact]
                assert(kc.hamming_near_perfect (get_sequence(facts_sequence, previous_fact)[-OL:], get_sequence(facts_sequence, current_fact)[:OL]))
                # assert(get_sequence(facts_sequence, previous_fact)[-OL:].upper() == get_sequence(facts_sequence, current_fact)[:OL].upper())
                sequence += get_sequence(facts_sequence, current_fact)[OL:]
                previous_fact = current_fact
            print(f">{cc_id}|{haplotype_id}|{facts_path}|{abundance}\n{sequence}")
    

def main():
    '''
    Creation of a FA file from a compacted fact int file. 
    '''

    parser = argparse.ArgumentParser(description='From phased facts to fasta file.')
    parser.add_argument("--gfa", type=str,
                        help="File containing the gfa generated by k3000/run.sh (usually named graph_final_X.gfa, with X the read set id)",
                        required=True)
    
    parser.add_argument("--paths", type=str,
                        help="File containing phased facts (format: \"haplotypeID	ccID	fact1;fact2;fact3\"",
                        required=True)


    args = parser.parse_args()
    gfa_file_name = str(args.gfa)
    path_facts_file_name = str(args.paths)
    generate_fa(gfa_file_name, path_facts_file_name)
    
    



if __name__ == "__main__":
     main()