File: annotate.pyx

package info (click to toggle)
obitools 3.0.1~b26%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 26,756 kB
  • sloc: ansic: 24,299; python: 657; sh: 27; makefile: 21
file content (405 lines) | stat: -rwxr-xr-x 15,682 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
#cython: language_level=3

from obitools3.apps.progress cimport ProgressBar  # @UnresolvedImport
from obitools3.dms import DMS
from obitools3.dms.view.view cimport View, Line_selection
from obitools3.uri.decode import open_uri
from obitools3.apps.optiongroups import addMinimalInputOption, addTaxonomyOption, addMinimalOutputOption, addNoProgressBarOption
from obitools3.dms.view import RollbackException
from functools import reduce
from obitools3.apps.config import logger
from obitools3.utils cimport tobytes, str2bytes
from io import BufferedWriter
from obitools3.dms.capi.obiview cimport NUC_SEQUENCE_COLUMN, \
                                        ID_COLUMN, \
                                        DEFINITION_COLUMN, \
                                        QUALITY_COLUMN, \
                                        COUNT_COLUMN, \
                                        TAXID_COLUMN
from obitools3.dms.capi.obitypes cimport OBI_STR
from obitools3.dms.column.column cimport Column

import time
import math 
import sys

from cpython.exc cimport PyErr_CheckSignals

 
__title__="Annotate views with new tags and edit existing annotations"


SPECIAL_COLUMNS = [NUC_SEQUENCE_COLUMN, ID_COLUMN, DEFINITION_COLUMN, QUALITY_COLUMN]

 
def addOptions(parser):
    
    addMinimalInputOption(parser)
    addTaxonomyOption(parser)
    addMinimalOutputOption(parser)
    addNoProgressBarOption(parser)

    group=parser.add_argument_group('obi annotate specific options')
 
    group.add_argument('--seq-rank',   # TODO seq/elt/line???
                       action="store_true", 
                       dest="annotate:add_rank",
                       default=False,
                       help="Add a rank attribute to the sequence "
                            "indicating the sequence position in the data.")
 
    group.add_argument('-R', '--rename-tag',
                       action="append", 
                       dest="annotate:rename_tags",
                       metavar="<OLD_NAME:NEW_NAME>",
                       type=str,
                       default=[],
                       help="Change tag name from OLD_NAME to NEW_NAME.")

    group.add_argument('-D', '--delete-tag',
                       action="append", 
                       dest="annotate:delete_tags",
                       metavar="<TAG_NAME>",
                       type=str,
                       default=[],
                       help="Delete tag TAG_NAME.")

    group.add_argument('-S', '--set-tag',
                       action="append", 
                       dest="annotate:set_tags",
                       metavar="<TAG_NAME:PYTHON_EXPRESSION>",
                       type=str,
                       default=[],
                       help="Add a new tag named TAG_NAME with "
                            "a value computed from PYTHON_EXPRESSION.")

    group.add_argument('--set-identifier',
                       action="store", 
                       dest="annotate:set_identifier",
                       metavar="<PYTHON_EXPRESSION>",
                       type=str,
                       default=None,
                       help="Set sequence identifier with "
                            "a value computed from PYTHON_EXPRESSION.")
    
    group.add_argument('--set-sequence',
                       action="store", 
                       dest="annotate:set_sequence",
                       metavar="<PYTHON_EXPRESSION>",
                       type=str,
                       default=None,
                       help="Change the sequence itself with "
                            "a value computed from PYTHON_EXPRESSION.")

    group.add_argument('--set-definition',
                       action="store", 
                       dest="annotate:set_definition",
                       metavar="<PYTHON_EXPRESSION>",
                       type=str,
                       default=None,
                       help="Set sequence definition with "
                            "a value computed from PYTHON_EXPRESSION.")

    group.add_argument('--run',
                       action="store", 
                       dest="annotate:run",
                       metavar="<PYTHON_EXPRESSION>",
                       type=str,
                       default=None,
                       help="Run a python expression on each element.")

    group.add_argument('-C', '--clear',
                       action="store_true", 
                       dest="annotate:clear",
                       default=False,
                       help="Clear all tags except the obligatory ones.")
    
    group.add_argument('-k','--keep',
                       action='append',
                       dest="annotate:keep",
                       metavar="<TAG>",
                       default=[],
                       type=str,
                       help="Only keep this tag. (Can be specified several times.)")
    
    group.add_argument('--length',
                       action="store_true", 
                       dest="annotate:length",
                       default=False,
                       help="Add 'seq_length' tag with sequence length.")
    
    group.add_argument('--with-taxon-at-rank',
                       action='append',
                       dest="annotate:taxon_at_rank",
                       metavar="<RANK_NAME>",
                       default=[],
                       type=str,
                       help="Add taxonomy annotation at the specified rank level RANK_NAME.")


def sequenceTaggerGenerator(config, taxo=None):
    
    toSet=None
    newId=None
    newDef=None
    newSeq=None
    length=None
    add_rank=None
    run=None
    
    if 'set_tags' in config['annotate']:   # TODO default option problem, to fix
        toSet = [x.split(':',1) for x in config['annotate']['set_tags'] if len(x.split(':',1))==2]
    if 'set_identifier' in config['annotate']:
        newId = config['annotate']['set_identifier']
    if 'set_definition' in config['annotate']:
        newDef = config['annotate']['set_definition']
    if 'set_sequence' in config['annotate']:
        newSeq = config['annotate']['set_sequence']
    if 'length' in config['annotate']:
        length = config['annotate']['length']
    if 'add_rank' in config["annotate"]:
        add_rank = config["annotate"]["add_rank"]
    if 'run' in config['annotate']:
        run = config['annotate']['run']
    counter = [0]
    
    for i in range(len(toSet)):
        for j in range(len(toSet[i])):
            toSet[i][j] = tobytes(toSet[i][j])

    annoteRank=[]
    if config['annotate']['taxon_at_rank']:
        if taxo is not None:
            annoteRank = config['annotate']['taxon_at_rank']
        else:
            raise Exception("A taxonomy must be provided to annotate taxon ranks")
    
    def sequenceTagger(seq):
                
        if counter[0]>=0:
            counter[0]+=1
                            
        for rank in annoteRank:
            if TAXID_COLUMN in seq:
                taxid = seq[TAXID_COLUMN]
                if taxid is not None:
                    rtaxid = taxo.get_taxon_at_rank(taxid, rank)
                    if rtaxid is not None:
                        scn = taxo.get_scientific_name(rtaxid)
                    else:
                        scn=None
                    seq[rank]=rtaxid
                    if "%s_name"%rank not in seq.view:
                        Column.new_column(seq.view, "%s_name"%rank, OBI_STR)
                    seq["%s_name"%rank]=scn
             
        if add_rank:
            seq['seq_rank']=counter[0]
    
        for i,v in toSet:
            try:
                if taxo is not None:
                    environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
                else:
                    environ = {'sequence':seq, 'counter':counter[0], 'math':math}
                val = eval(v, environ, seq)
            except Exception:       # set string if not a valid expression
                val = v
            seq[i]=val

        if length:
            seq['seq_length']=len(seq)

        if newId is not None:
            try:
                if taxo is not None:
                    environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
                else:
                    environ = {'sequence':seq, 'counter':counter[0], 'math':math}     
                val = eval(newId, environ, seq)
            except Exception:      # set string if not a valid expression
                val = newId
            seq.id=val

        if newDef is not None:
            try:
                if taxo is not None:
                    environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
                else:
                    environ = {'sequence':seq, 'counter':counter[0], 'math':math}     
                val = eval(newDef, environ, seq)
            except Exception:    # set string if not a valid expression
                val = newDef
            seq.definition=val
             
        if newSeq is not None:
            try:
                if taxo is not None:
                    environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
                else:
                    environ = {'sequence':seq, 'counter':counter[0], 'math':math}     
                val = eval(newSeq, environ, seq)
            except Exception:    # set string if not a valid expression
                val = newSeq
            seq.seq=val
            if 'seq_length' in seq:
                seq['seq_length']=len(seq)
            # Delete quality since it must match the sequence.
            # TODO discuss deleting for each sequence separately
            if QUALITY_COLUMN in seq:
                seq.view.delete_column(QUALITY_COLUMN)
                    
        if run is not None:
            try:
                if taxo is not None:
                    environ = {'taxonomy' : taxo, 'sequence':seq, 'counter':counter[0], 'math':math}
                else:
                    environ = {'sequence':seq, 'counter':counter[0], 'math':math}     
                eval(run, environ, seq)
            except Exception,e:
                raise e

    return sequenceTagger


def run(config):
     
    DMS.obi_atexit()
    
    logger("info", "obi annotate")

    # Open the input
    input = open_uri(config['obi']['inputURI'])
    if input is None:
        raise Exception("Could not read input view")
    i_dms = input[0]
    i_view = input[1]
    i_view_name = input[1].name

    # Open the output: only the DMS, as the output view is going to be created by cloning the input view
    # (could eventually be done via an open_uri() argument)
    output = open_uri(config['obi']['outputURI'],
                      input=False,
                      dms_only=True)
    if output is None:
        raise Exception("Could not create output view")
    o_dms = output[0]
    output_0 = output[0]
    o_view_name = output[1]

    # stdout output: create temporary view
    if type(output_0)==BufferedWriter:
        o_dms = i_dms
        i=0
        o_view_name = b"temp"
        while o_view_name in i_dms: # Making sure view name is unique in output DMS
            o_view_name = o_view_name+b"_"+str2bytes(str(i))
            i+=1
        imported_view_name = o_view_name

    # If the input and output DMS are not the same, import the input view in the output DMS before cloning it to modify it
    # (could be the other way around: clone and modify in the input DMS then import the new view in the output DMS)
    if i_dms != o_dms:
        imported_view_name = i_view_name
        i=0
        while imported_view_name in o_dms:  # Making sure view name is unique in output DMS
            imported_view_name = i_view_name+b"_"+str2bytes(str(i))
            i+=1
        View.import_view(i_dms.full_path[:-7], o_dms.full_path[:-7], i_view_name, imported_view_name)
        i_view = o_dms[imported_view_name]

    # Clone output view from input view
    o_view = i_view.clone(o_view_name)
    if o_view is None:
        raise Exception("Couldn't create output view")
    i_view.close()

    # Open taxonomy if there is one
    if 'taxoURI' in config['obi'] and config['obi']['taxoURI'] is not None:
        taxo_uri = open_uri(config['obi']['taxoURI'])
        if taxo_uri is None or taxo_uri[2] == bytes:
            raise Exception("Couldn't open taxonomy")
        taxo = taxo_uri[1]
    else :
        taxo = None

    # Initialize the progress bar
    if config['obi']['noprogressbar'] == False:
        pb = ProgressBar(len(o_view), config)
    else:
        pb = None

    try:

        # Apply editions
        # Editions at view level
        if 'delete_tags' in config['annotate']:
            toDelete = config['annotate']['delete_tags'][:]
        if 'rename_tags' in config['annotate']:
            toRename = [x.split(':',1) for x in config['annotate']['rename_tags'] if len(x.split(':',1))==2]
        if 'clear' in config['annotate']:
            clear = config['annotate']['clear']
        if 'keep' in config['annotate']:
            keep = config['annotate']['keep']
        for i in range(len(toDelete)):
            toDelete[i] = tobytes(toDelete[i])
        for i in range(len(toRename)):
            for j in range(len(toRename[i])):
                toRename[i][j] = tobytes(toRename[i][j])
        for i in range(len(keep)):
            keep[i] = tobytes(keep[i])
        keep = set(keep)
    
        if clear or keep:
            keys = [k for k in o_view.keys()]
            for k in keys:
                if k not in keep and k not in SPECIAL_COLUMNS:
                    o_view.delete_column(k)
        else:
            for k in toDelete:
                o_view.delete_column(k)
            for old_name, new_name in toRename:
                if old_name in o_view:
                    o_view.rename_column(old_name, new_name)
        
        # Editions at line level
        sequenceTagger = sequenceTaggerGenerator(config, taxo=taxo)
        for i in range(len(o_view)):
            PyErr_CheckSignals()
            if pb is not None:
                pb(i)
            sequenceTagger(o_view[i])

    except Exception, e:
        raise RollbackException("obi annotate error, rollbacking view: "+str(e), o_view)

    if pb is not None:
        pb(i, force=True)
        print("", file=sys.stderr)
    
    # Save command config in View and DMS comments
    command_line = " ".join(sys.argv[1:])
    input_dms_name=[input[0].name]
    input_view_name=[i_view_name]
    if 'taxoURI' in config['obi'] and config['obi']['taxoURI'] is not None:
        input_dms_name.append(config['obi']['taxoURI'].split("/")[-3])
        input_view_name.append("taxonomy/"+config['obi']['taxoURI'].split("/")[-1])
    o_view.write_config(config, "annotate", command_line, input_dms_name=input_dms_name, input_view_name=input_view_name)
    o_dms.record_command_line(command_line)

    #print("\n\nOutput view:\n````````````", file=sys.stderr)
    #print(repr(o_view), file=sys.stderr)

    # stdout output: write to buffer
    if type(output_0)==BufferedWriter:
        logger("info", "Printing to output...")
        o_view.print_to_output(output_0, noprogressbar=config['obi']['noprogressbar'])
        o_view.close()

    # If the input and the output DMS are different or if stdout output, delete the temporary imported view used to create the final view
    if i_dms != o_dms or type(output_0)==BufferedWriter:
        View.delete_view(o_dms, imported_view_name)
        o_dms.close(force=True)
    i_dms.close(force=True)

    logger("info", "Done.")