File: sequence.py

package info (click to toggle)
python-cogent 1.5.3-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 16,424 kB
  • ctags: 24,343
  • sloc: python: 134,200; makefile: 100; ansic: 17; sh: 10
file content (153 lines) | stat: -rw-r--r-- 5,541 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import sqlalchemy as sql

from cogent import DNA
from cogent.core.location import Map

from cogent.db.ensembl.species import Species
from cogent.db.ensembl.util import NoItemError, asserted_one
from cogent.db.ensembl.assembly import CoordSystem, Coordinate, \
                                    get_coord_conversion

__author__ = "Gavin Huttley"
__copyright__ = "Copyright 2007-2012, The Cogent Project"
__credits__ = ["Gavin Huttley"]
__license__ = "GPL"
__version__ = "1.5.3"
__maintainer__ = "Gavin Huttley"
__email__ = "Gavin.Huttley@anu.edu.au"
__status__ = "alpha"

# local reference to the sqlalchemy substring function
substr = sql.sql.func.substr

def _assemble_seq(frags, start, end, frag_positions):
    """returns a single string in which missing sequence is replaced by 'N'"""
    prev_end = start
    assert len(frag_positions) == len(frags), "Mismatched number of "\
                                                    "fragments and positions"
    assembled = []
    for index, (frag_start, frag_end) in enumerate(frag_positions):
        diff = frag_start - prev_end
        assert diff >= 0, 'fragment position start < previous end: %s, %s' %\
                                                (frag_start, prev_end)
        assembled += ['N'*diff, frags[index]]
        prev_end = frag_end
    diff = end - frag_end
    assert diff >= 0, 'end[%s] < previous frag_end[%s]' % (end, frag_end)
    assembled += ['N' * diff]
    return DNA.makeSequence(''.join(assembled))

def _make_coord(genome, coord_name, start, end, strand):
    """returns a Coordinate"""
    return Coordinate(CoordName=coord_name, Start=start, End=end,
                Strand=strand, genome=genome)


def get_lower_coord_conversion(coord, species, core_db):
    coord_system = CoordSystem(species=species, core_db=core_db)
    seq_level_coord_type = CoordSystem(species=species,core_db=core_db,
                             seq_level=True)
    query_rank = coord_system[coord.CoordType].rank
    seq_level_rank = coord_system[seq_level_coord_type].rank
    assemblies = None
    for rank in range(query_rank+1, seq_level_rank):
        coord_type = None
        for key in coord_system.keys():
            if coord_system[key].rank == rank:
                coord_type = coord_system[key].name
                break
        
        if coord_type is None:
            continue
        
        assemblies = get_coord_conversion(coord, coord_type, core_db)
        
        if assemblies: 
            break
        
    
    return assemblies

def _get_sequence_from_direct_assembly(coord=None, DEBUG=False):
    # TODO clean up use of a coord
    genome = coord.genome
    # no matter what strand user provide, we get the + sequence first
    coord.Strand = 1
    species = genome.Species
    coord_type = CoordSystem(species=species,core_db=genome.CoreDb,
                             seq_level=True)
    
    if DEBUG:
        print 'Created Coordinate:',coord,coord.EnsemblStart,coord.EnsemblEnd
        print coord.CoordType, coord_type
    
    assemblies = get_coord_conversion(coord, coord_type, genome.CoreDb)
    
    if not assemblies:
        raise NoItemError, 'no assembly for %s' % coord
    
    dna = genome.CoreDb.getTable('dna')
    seqs, positions = [], []
    for q_loc, t_loc in assemblies:
        assert q_loc.Strand == 1
        length = len(t_loc)
        # get MySQL to do the string slicing via substr function
        query = sql.select([substr(dna.c.sequence,
                                  t_loc.EnsemblStart,
                                  length).label('sequence')],
                            dna.c.seq_region_id == t_loc.seq_region_id)
        record = asserted_one(query.execute().fetchall())
        seq = record['sequence']
        seq = DNA.makeSequence(seq)
        if t_loc.Strand == -1:
            seq = seq.rc()
        seqs.append(str(seq))
        positions.append((q_loc.Start, q_loc.End))
    sequence = _assemble_seq(seqs, coord.Start, coord.End, positions)
    return sequence

def _get_sequence_from_lower_assembly(coord, DEBUG):
    assemblies = get_lower_coord_conversion(coord, coord.genome.Species,
                                            coord.genome.CoreDb)
    if not assemblies:
        raise NoItemError, 'no assembly for %s' % coord
    
    if DEBUG:
        print '\nMedium_level_assemblies = ', assemblies
    
    seqs, positions = [], []
    for q_loc, t_loc in assemblies:
        t_strand = t_loc.Strand
        temp_seq = _get_sequence_from_direct_assembly(t_loc, DEBUG)
        if t_strand == -1:
            temp_seq = temp_seq.rc()
        
        if DEBUG:
            print q_loc
            print t_loc
            print 'temp_seq = ', temp_seq[:10], '\n'
        
        seqs.append(str(temp_seq))
        positions.append((q_loc.Start, q_loc.End))
    
    sequence = _assemble_seq(seqs, coord.Start, coord.End, positions)
    return sequence

def get_sequence(coord=None, genome=None, coord_name=None, start=None, end=None, strand=1, DEBUG=False):
    if coord is None:
        coord = _make_coord(genome, coord_name, start, end, 1)
    else:
        coord = coord.copy()
    
    strand = coord.Strand
    
    try: 
        sequence = _get_sequence_from_direct_assembly(coord, DEBUG)
    except NoItemError:
        ## means there is no assembly, so we do a thorough assembly by converting according to the "rank"
        sequence = _get_sequence_from_lower_assembly(coord, DEBUG)
    
    if strand == -1:
        sequence = sequence.rc()
    return sequence