1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
#!/usr/bin/python3
# encoding: utf-8
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import os, sys, re
import logging
import argparse
import collections
import numpy
import time
import TNode
import Trinity_util
logger = logging.getLogger(__name__)
class Node_path:
"""
Object representation of the connected set of Node objects that represent the reconstructed isoforms graph traversal
Instance members:
transcript_name : (str) name of the isoform
node_obj_list : (list) of Node objects
"""
def __init__(self, tgraph, transcript_name, path_string, sequence):
"""
constructor, instantiates Node_path and builds vertices in the graph
"""
self.transcript_name = transcript_name
self.node_obj_list = list()
node_descr_list = re.findall("\d+:\d+\-\d+", path_string)
# 1st node is special (full kmer prefix included)
first_kmer_flag = False
obj_node_list = list()
for node_descr in node_descr_list:
(loc_node_id, node_coord_range) = node_descr.split(":")
(lend,rend) = node_coord_range.split("-")
lend = int(lend)
rend = int(rend)
if not first_kmer_flag:
first_kmer_flag = True
loc_node_id += 'fst'
# use factory call to instantiate node objects:
node_obj = tgraph.get_node(transcript_name,
loc_node_id, sequence[lend:rend+1]) # coords in path were already zero-based
self.node_obj_list.append(node_obj)
def get_transcript_name(self):
return self.transcript_name
def get_path(self):
return self.node_obj_list
def __repr__(self):
node_str_list = list()
for node in self.node_obj_list:
node_str_list.append(str(node))
path_str = "--".join(node_str_list)
return path_str
@staticmethod
def adjust_for_fst_nodes(tgraph, node_path_list):
"""
fst nodes will have an extra 5' sequence as compared to the corresponding non-fst nodes.
If both the fst and non-fst version of the node exist, must modify the fst nodes so that
they are separated from their 5' extension, and the core of the node (suffix) is shared.
input: TGraph obj, list of node_path objects.
The node_path objects are modified in-place as needed.
A fst-node will be truncated to the unique prefix and the non-fst node will be integrated into the path.
returns the node_path_list with any required adjustments
"""
# get list of fst nodes requiring adjustment
fst_nodes_require_adj = list()
nodes = tgraph.get_all_nodes()
for node in nodes:
node_id = node.get_loc_id()
if re.search("fst", node_id):
core_node_id = re.sub("fst", "", node_id)
core_node = tgraph.retrieve_node(core_node_id)
if core_node is not None:
fst_nodes_require_adj.append( (node, core_node) )
if not fst_nodes_require_adj:
# nothing to do
logger.debug("no FST nodes to adjust")
return node_path_list
logger.debug("Adjusting FST nodes: {}".format(fst_nodes_require_adj))
old_fst_node_to_new_fst_nodes = dict()
fst_nodes_to_delete = list()
# perform node modifications:
for (fst_node, core_node) in fst_nodes_require_adj:
fst_node_seq = fst_node.get_seq()
core_node_seq = core_node.get_seq()
# reverse, index, then revcomp the index value to get the actual position.
fst_node_seq_rev = fst_node_seq[::-1]
core_node_seq_rev = fst_node_seq[::-1]
if not re.match(core_node_seq_rev, fst_node_seq_rev):
raise RuntimeError("Error, core_node_seq:\n{}\nis not a suffix of fst seq:\n{}\n".format(core_node_seq, fst_node_seq))
prefix_endpt = len(fst_node_seq) - len(core_node_seq)
if prefix_endpt == 0:
assert fst_node_seq == core_node_seq, "Error, prefix starts at first position but sequences are not equivalent"
core_node.add_transcripts(fst_node.get_transcripts())
old_fst_node_to_new_fst_nodes[fst_node] = [core_node]
fst_nodes_to_delete.append(fst_node)
else:
prefix_string = fst_node_seq[0:prefix_endpt]
logger.debug("FST-SEQ-EXTRACTION\n\nFSTseq:\n{}\n\nCOREseq:\n{}\n\nPREFIXseq:\n{}\n\n".format(fst_node_seq, core_node_seq, prefix_string))
core_node.add_transcripts(fst_node.get_transcripts())
old_fst_node_to_new_fst_nodes[fst_node] = [fst_node, core_node]
fst_node.set_seq(prefix_string)
# now perform node path updates
for node_path in node_path_list:
nodes = node_path.get_path()
first_node = nodes[0]
if first_node in old_fst_node_to_new_fst_nodes:
# must replace
replacement_node_list = old_fst_node_to_new_fst_nodes[first_node]
if len(replacement_node_list) == 1:
# swap it out
nodes[0] = replacement_node_list[0]
elif len(replacement_node_list) == 2:
nodes[0] = replacement_node_list[1]
nodes.insert(0, replacement_node_list[0])
else:
raise RuntimeError("shouldn't get here")
# purge the nodes targeted for deletion
for node in fst_nodes_to_delete:
tgraph.prune_node(node)
return node_path_list
|