File: clean.pyx

package info (click to toggle)
obitools 3.0.1~b26%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 26,756 kB
  • sloc: ansic: 24,299; python: 657; sh: 27; makefile: 21
file content (146 lines) | stat: -rwxr-xr-x 6,024 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#cython: language_level=3

from obitools3.apps.progress cimport ProgressBar  # @UnresolvedImport
from obitools3.dms.dms cimport DMS
from obitools3.dms.view import RollbackException
from obitools3.dms.capi.obiclean cimport obi_clean
from obitools3.apps.optiongroups import addMinimalInputOption, addMinimalOutputOption, addNoProgressBarOption
from obitools3.uri.decode import open_uri
from obitools3.apps.config import logger
from obitools3.utils cimport tobytes, str2bytes
from obitools3.dms.view.view cimport View
from obitools3.dms.view.typed_view.view_NUC_SEQS cimport View_NUC_SEQS

from io import BufferedWriter
import sys


__title__="Tag a set of sequences for PCR and sequencing errors identification"


def addOptions(parser):

    addMinimalInputOption(parser)
    addMinimalOutputOption(parser)
    addNoProgressBarOption(parser)
    
    group = parser.add_argument_group('obi clean specific options')

    group.add_argument('--distance', '-d',
                       action="store", dest="clean:distance",
                       metavar='<DISTANCE>',
                       default=1.0,
                       type=float,
                       help="Maximum numbers of errors between two variant sequences. Default: 1.")

    group.add_argument('--sample-tag', '-s',
                       action="store", 
                       dest="clean:sample-tag-name",
                       metavar="<SAMPLE TAG NAME>",
                       type=str,
                       help="Name of the tag where merged sample count informations are kept (typically generated by obi uniq, usually MERGED_sample, default: None).")
    
    group.add_argument('--ratio', '-r',
                       action="store", dest="clean:ratio",
                       metavar='<RATIO>',
                       default=0.5,
                       type=float,
                       help="Maximum ratio between the counts of two sequences so that the less abundant one can be considered"
                            " a variant of the more abundant one. Default: 0.5.")
 
    group.add_argument('--heads-only', '-H',
                       action="store_true", 
                       dest="clean:heads-only",
                       default=False,
                       help="Only sequences labeled as heads are kept in the output. Default: False")

#    group.add_argument('--cluster-tags', '-C',
#                       action="store_true", 
#                       dest="clean:cluster-tags",
#                       default=False,
#                       help="Adds tags for each sequence giving its cluster's head and weight for each sample.")

    group.add_argument('--thread-count','-p',   # TODO should probably be in a specific option group
                       action="store", dest="clean:thread-count",
                       metavar='<THREAD COUNT>',
                       default=-1,
                       type=int,
                       help="Number of threads to use for the computation. Default: the maximum available.")


def run(config):
        
    DMS.obi_atexit()
    
    logger("info", "obi clean")

    # Open the input: only the DMS
    input = open_uri(config['obi']['inputURI'],
                     dms_only=True)
    if input is None:
        raise Exception("Could not read input")
    i_dms = input[0]
    i_dms_name = input[0].name
    i_view_name = input[1]

    # Open the output: only the DMS
    output = open_uri(config['obi']['outputURI'],
                      input=False,
                      dms_only=True)
    if output is None:
        raise Exception("Could not create output")
    o_dms = output[0]
    output_0 = output[0]
    final_o_view_name = output[1]
    
    # If stdout output or the input and output DMS are not the same, create a temporary view that will be exported to 
    # the right DMS and deleted in the other afterwards.
    if i_dms != o_dms or type(output_0)==BufferedWriter:
        temporary_view_name = b"temp"
        i=0
        while temporary_view_name in i_dms:  # Making sure view name is unique in input DMS
            temporary_view_name = temporary_view_name+b"_"+str2bytes(str(i))
            i+=1
        o_view_name = temporary_view_name
        if type(output_0)==BufferedWriter:
            o_dms = i_dms
    else:
        o_view_name = final_o_view_name
        
    # Save command config in View comments
    command_line = " ".join(sys.argv[1:])
    comments = View.print_config(config, "clean", command_line, input_dms_name=[i_dms_name], input_view_name=[i_view_name])

    if 'sample-tag-name' not in config['clean']:
        config['clean']['sample-tag-name'] = ""
        
    if obi_clean(i_dms.name_with_full_path, tobytes(i_view_name), tobytes(config['clean']['sample-tag-name']), tobytes(o_view_name), comments, \
              config['clean']['distance'], config['clean']['ratio'], config['clean']['heads-only'], config['clean']['thread-count']) < 0:
        raise Exception("Error running obiclean")
    
    # If the input and output DMS are not the same, export result view to output DMS
    if i_dms != o_dms:
        View.import_view(i_dms.full_path[:-7], o_dms.full_path[:-7], o_view_name, final_o_view_name)

    # stdout output: write to buffer
    if type(output_0)==BufferedWriter:
        logger("info", "Printing to output...")
        o_view = o_dms[o_view_name]
        o_view.print_to_output(output_0, noprogressbar=config['obi']['noprogressbar'])
        o_view.close()

    # Save command config in DMS comments
    o_dms.record_command_line(command_line)

    #print("\n\nOutput view:\n````````````", file=sys.stderr)
    #print(repr(o_dms[final_o_view_name]), file=sys.stderr)

    # If the input and the output DMS are different or if stdout output, delete the temporary imported view used to create the final view
    if i_dms != o_dms or type(output_0)==BufferedWriter:
        View.delete_view(i_dms, o_view_name)
        o_dms.close(force=True)
    
    i_dms.close(force=True)

    logger("info", "Done.")