1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
#!/usr/bin/python3
# encoding: utf-8
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import os, sys, re
import logging
import argparse
import collections
import numpy
import time
import hashlib
logger = logging.getLogger(__name__)
class Trinity_fasta_parser:
"""
Parses a Trinity.fasta file and stores the transcript name, sequence, and node path info.
Instance member:
trinity_gene_to_isoform_seqs : (defaultdict(list)) stores key,val of transcript_name,path_struct
where path_struct has structure:
{
'transcript_name' : accession,
'path' : path_str,
'seq' : sequence
}
"""
def __init__(self, trinity_fasta_filename):
self.trinity_gene_to_isoform_seqs = collections.defaultdict(list) #instantiate member
seen = dict()
with open(trinity_fasta_filename) as fh:
header = ""
sequence = ""
for line in fh:
line = line.rstrip()
if line[0] == '>':
# got header line
# process earlier entry
if header != "" and sequence != "":
shaval = hashlib.sha224(sequence.encode('utf-8')).hexdigest()
if shaval in seen:
sys.stderr.write("warning, ignoring duplicate sequence entry {}".format(header))
else:
self.add_trinity_seq_entry(header, sequence)
seen[shaval] = True
# init new entry
header = line
sequence = ""
else:
# sequence line
sequence += line
# get last one
if sequence != "":
shaval = hashlib.sha224(sequence.encode('utf-8')).hexdigest()
if shaval in seen:
sys.stderr.write("warning, ignoring duplicate sequence entry {}".format(header))
else:
self.add_trinity_seq_entry(header, sequence)
seen[shaval] = True
def add_trinity_seq_entry(self, header, sequence):
"""
entry looks like so:
>TRINITY_DN16_c0_g1_i2 len=266 path=[1:0-48 27:49-49 28:50-50 27:51-51 28:52-52 27:53-53 28:54-54 27:55-55 28:56-56 27:57-57 28:58-58 27:59-59 28:60-60 27:61-61 29:62-265] [-1, 1, 27, 28, 27, 28, 27, 28, 27, 28, 27, 28, 27, 28, 27, 29, -2]
CTGTTGTGTGGGGGGTGCGCTTGTTTTCTCTCTCTCTCTCTCTCTCTCTCTCTCTCTCTC
TCAAGTTGATTCCTCCATGTTGCTTTACAGAGACCTGCCAACTACCCAGGAATGTAAAAG
CATTCATAGTATTTGTCTAGTAGAGATGCTGTATGAAAAATGCCAAAACCAAAAAGAGAA
AGAAGGAAAGAGAGATAGATAGATGACATAGATGACGGATGGATGGGTGGGTGGGTGGAT
GGATGGATGGATGGATGGAGGGGGGC
"""
m = re.search("^>(\S+)", header)
if not m:
raise RuntimeError("Error, cannot parse accession from header: {}".format(header))
accession = m.group(1)
m = re.search("path=\[([^\]]+)\]", header)
if not m:
raise RuntimeError("Error, cannot parse path info from header of line: {}".format(header))
path_str = m.group(1)
# get gene ID
gene_id = re.sub("_i\d+$", "", accession)
if gene_id == accession:
raise RuntimeError("Error, couldn't remove isoform ID from Trinity accession: {}".format(accession))
isoform_list = self.trinity_gene_to_isoform_seqs[ gene_id ]
iso_struct = { 'transcript_name' : accession,
'path' : path_str,
'seq' : sequence }
isoform_list.append(iso_struct)
def get_trinity_gene_to_isoform_info(self):
return self.trinity_gene_to_isoform_seqs
|