File: parallel_identify_chimeric_seqs.py

package info (click to toggle)
qiime 1.4.0-2
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 29,704 kB
  • sloc: python: 77,837; haskell: 379; sh: 113; makefile: 103
file content (353 lines) | stat: -rwxr-xr-x 17,517 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
#!/usr/bin/env python
# File created on 09 Feb 2010
from __future__ import division

__author__ = "Jens Reeder"
__copyright__ = "Copyright 2011, The QIIME Project"
__credits__ = ["Greg Caporaso", "Jens Reeder"]
__license__ = "GPL"
__version__ = "1.4.0"
__maintainer__ = "Greg Caporaso"
__email__ = "gregcaporaso@gmail.com"
__status__ = "Release"
 

from qiime.util import make_option
from os import popen, system, makedirs, mkdir
from os.path import split, splitext, join
from subprocess import check_call, CalledProcessError
from shutil import copy

from qiime.util import get_tmp_filename
from cogent.app.formatdb import build_blast_db_from_fasta_path
from cogent.parse.fasta import MinimalFastaParser

from qiime.util import parse_command_line_parameters,\
    load_qiime_config, get_options_lookup, get_qiime_scripts_dir,\
    write_degapped_fasta_to_file
from qiime.parallel.identify_chimeric_seqs import get_job_commands,\
    get_poller_command
from qiime.parallel.util import split_fasta, get_random_job_prefix,\
    write_jobs_file,submit_jobs, compute_seqs_per_file,\
    build_filepaths_from_filepaths, write_filepaths_to_file,\
    write_merge_map_file_identify_chimeric_seqs
from qiime.identify_chimeric_seqs import make_cidx_file

qiime_config = load_qiime_config()
options_lookup = get_options_lookup()

script_info={}
script_info['brief_description']="""Parallel chimera detection"""
script_info['script_description']="""This script works like the identify_chimeric_seqs.py script, but is intended to make use of multicore/multiprocessor environments to perform analyses in parallel."""
script_info['script_usage']=[]
#copied from identify_chimeric_seqs.py
script_info['script_usage'].append(("""blast_fragments example""","""For each sequence provided as input, the blast_fragments method splits the input sequence into n roughly-equal-sized, non-overlapping fragments, and assigns taxonomy to each fragment against a reference database. The BlastTaxonAssigner (implemented in assign_taxonomy.py) is used for this. The taxonomies of the fragments are compared with one another (at a default depth of 4), and if contradictory assignments are returned the sequence is identified as chimeric. For example, if an input sequence was split into 3 fragments, and the following taxon assignments were returned:

==========  ==========================================================
fragment1:  Archaea;Euryarchaeota;Methanobacteriales;Methanobacterium
fragment2:  Archaea;Euryarchaeota;Halobacteriales;uncultured
fragment3:  Archaea;Euryarchaeota;Methanobacteriales;Methanobacterium
==========  ==========================================================

The sequence would be considered chimeric at a depth of 3 (Methanobacteriales vs. Halobacteriales), but non-chimeric at a depth of 2 (all Euryarchaeota).

blast_fragments begins with the assumption that a sequence is non-chimeric, and looks for evidence to the contrary. This is important when, for example, no taxonomy assignment can be made because no blast result is returned. If a sequence is split into three fragments, and only one returns a blast hit, that sequence would be considered non-chimeric. This is because there is no evidence (i.e., contradictory blast assignments) for the sequence being chimeric. This script can be run by the following command, where the resulting data is written to the directory "identify_chimeras/" and using default parameters (e.g. chimera detection method ("-m blast_fragments"), number of fragments ("-n 3"), taxonomy depth ("-d 4") and maximum E-value ("-e 1e-30")):""","""%prog -i repr_set_seqs.fasta -t taxonomy_assignment.txt -r ref_seq_set.fna -o chimeric_seqs.txt"""))

script_info['script_usage'].append(("""ChimeraSlayer Example:""",
                                    """Identify chimeric sequences using the ChimeraSlayer algorithm against a user provided reference database. The input sequences need to be provided in aligned (Py)Nast format and the reference database needs to be provided as aligned FASTA (-a). Note that the reference database needs to be the same that was used to build the alignment of the input sequences!""",
                                    """%prog -m ChimeraSlayer -i repr_set_seqs_aligned.fasta -a ref_seq_set_aligned.fasta -o chimeric_seqs.txt"""))
                           
script_info['output_description']="""The result of parallel_identify_chimeric_seqs.py is a text file that identifies which sequences are chimeric."""

script_info['required_options']=[
    options_lookup['fasta_as_primary_input'],
]

chimera_detection_method_choices = ['blast_fragments','ChimeraSlayer']

script_info['optional_options']=[\
    make_option('-a', '--aligned_reference_seqs_fp',
        help='Path to (Py)Nast aligned reference sequences. '
        'REQUIRED when method ChimeraSlayer [default: %default]'), 

    make_option('-t', '--id_to_taxonomy_fp',
        help='Path to tab-delimited file mapping sequences to assigned '
         'taxonomy. Each assigned taxonomy is provided as a comma-separated '
         'list. [default: %default; REQUIRED when method is blast_fragments]'),

    make_option('-r', '--reference_seqs_fp',
        help='Path to reference sequences (used to build a blast db when method blast_fragments). '
        '[default: %default; REQUIRED when method blast_fragments'+\
         ' if no blast_db is provided;]'),

    make_option('-b', '--blast_db',
        help='Database to blast against. Must provide either --blast_db or '
        '--reference_seqs_fp when method is blast_fragments [default: %default]'),
        
    make_option('-m','--chimera_detection_method',\
          type='choice',help='Chimera detection method. Choices: '+\
                    " or ".join(chimera_detection_method_choices) +\
                    '. [default:%default]',\
          choices=chimera_detection_method_choices, default='ChimeraSlayer'),
          
    make_option('-n','--num_fragments',\
          type='int',help='Number of fragments to split sequences into' +\
          ' (i.e., number of expected breakpoints + 1) [default: %default]',\
          default=3),
          
    make_option('-d','--taxonomy_depth',\
          type='int',help='Number of taxonomic divisions to consider' +\
          ' when comparing taxonomy assignments [default: %default]',\
          default=4),
          
    make_option('-e','--max_e_value',\
                    type='float',help='Max e-value to assign taxonomy' +\
                    ' [default: %default]', default=1e-30),

    make_option('--min_div_ratio',\
                    type='float',help='min divergence ratio '+\
                    '(passed to ChimeraSlayer). If set to None uses ' +\
                    'ChimeraSlayer default value. '+\
                    ' [default: %default]', default=None),       

    make_option('-o', '--output_fp',
        help='Path to store output [default: derived from input_seqs_fp]'),
          
    #Define parallel-script-specific parameters
    make_option('-N','--identify_chimeric_seqs_fp',action='store',\
           type='string',help='full path to '+\
           'scripts/identify_chimeric_seqs.py [default: %default]',\
           default=join(get_qiime_scripts_dir(),'identify_chimeric_seqs.py')),\
        
 options_lookup['jobs_to_start'],\
 options_lookup['poller_fp'],\
 options_lookup['retain_temp_files'],\
 options_lookup['suppress_submit_jobs'],\
 options_lookup['poll_directly'],\
 options_lookup['cluster_jobs_fp'],\
 options_lookup['suppress_polling'],\
 options_lookup['job_prefix'],\
 options_lookup['python_exe_fp'],\
 options_lookup['seconds_to_sleep']\
]

script_info['version'] = __version__

def main():

    option_parser, opts, args = parse_command_line_parameters(**script_info)

   # create local copies of command-line options
    python_exe_fp = opts.python_exe_fp
    identify_chimeric_seqs_fp = opts.identify_chimeric_seqs_fp
    reference_seqs_fp = opts.reference_seqs_fp
    cluster_jobs_fp = opts.cluster_jobs_fp
    input_fasta_fp = opts.input_fasta_fp 
    jobs_to_start = opts.jobs_to_start
    poller_fp = opts.poller_fp
    retain_temp_files = opts.retain_temp_files
    suppress_polling = opts.suppress_polling
    seconds_to_sleep = opts.seconds_to_sleep
    poll_directly = opts.poll_directly
    
    id_to_taxonomy_fp = opts.id_to_taxonomy_fp
    aligned_reference_seqs_fp = opts.aligned_reference_seqs_fp
    blast_db = opts.blast_db
    chimera_detection_method = opts.chimera_detection_method
    num_fragments = opts.num_fragments
    taxonomy_depth = opts.taxonomy_depth
    max_e_value = opts.max_e_value
    min_div_ratio = opts.min_div_ratio
    output_fp = opts.output_fp

    created_temp_paths = []
    
    #additional option checks (copied from scripts/identify_chimeric_seqs.py)
    if opts.chimera_detection_method == 'blast_fragments':
        if not (opts.blast_db or opts.reference_seqs_fp):
            option_parser.error('Must provide either --blast_db or'+\
                ' --reference_seqs_fp and --id_to_taxonomy_fp when'+\
                ' method is blast_fragments.')
        if not opts.id_to_taxonomy_fp:
            option_parser.error('Must provide --id_to_taxonomy_fp when method'+\
                ' is blast_fragments.')

        if opts.num_fragments < 2:
            option_parser.error('Invalid number of fragments (-n %d) Must be >= 2.' \
                                    % opts.num_fragments)

    elif opts.chimera_detection_method == 'ChimeraSlayer':
        if not opts.aligned_reference_seqs_fp:
            option_parser.error("Must provide --aligned_reference_seqs_fp "+\
                                    "when using method ChimeraSlayer")
            
    #set the output_fp if not set
    if not output_fp:
        input_basename = splitext(split(input_fasta_fp)[1])[0]
        output_fp = '%s_chimeric.txt' % input_basename

    #get the dir path to the output file
    output_dir, _ = split(output_fp)
    if output_dir=="":
        output_dir ="./"
    
    # split the input filepath into directory and filename, base filename and
    # extension
    input_dir, input_fasta_fn = split(input_fasta_fp)
    input_file_basename, input_fasta_ext = splitext(input_fasta_fn)
    
    # set the job_prefix either based on what the user passed in,
    # or a random string beginning with CHIM
    job_prefix = opts.job_prefix or get_random_job_prefix('CHIM')
    
    # A temporary output directory is created in output_dir named
    # job_prefix. Output files are then moved from the temporary 
    # directory to the output directory when they are complete, allowing
    # a poller to detect when runs complete by the presence of their
    # output files.
    working_dir = '%s/%s' % (output_dir,job_prefix)
    try:
        makedirs(working_dir)
        created_temp_paths.append(working_dir)
    except OSError:
        # working dir already exists
        pass

    if chimera_detection_method=="blast_fragments":
        #pre-compute the blast db and pass to parallel jobs
        blast_db, db_files_to_remove = \
             build_blast_db_from_fasta_path(reference_seqs_fp,
                                            output_dir=working_dir)
        #unset option because blast_db shall be used instead of ref_set
        reference_seqs_fp = None
        created_temp_paths.extend(db_files_to_remove)

    if chimera_detection_method=="ChimeraSlayer":
        #copy the reference files to working dir
        #ChimeraSlayer creates an index file of the ref and
        #will crash without write permission in the ref seqs dir
        _, new_ref_filename = split(aligned_reference_seqs_fp)
        copy(aligned_reference_seqs_fp, working_dir)
        aligned_reference_seqs_fp = working_dir + "/"+new_ref_filename
        created_temp_paths.append(aligned_reference_seqs_fp)
 
        #if given, also copy the unaligned ref db
        if reference_seqs_fp:
            _, new_ref_filename = split(reference_seqs_fp)
            copy(reference_seqs_fp, working_dir)
            reference_seqs_fp = working_dir + "/" + new_ref_filename

        #otherwise create it
        else:
            reference_seqs_fp = write_degapped_fasta_to_file(MinimalFastaParser( \
                    open(aligned_reference_seqs_fp)), tmp_dir=working_dir)
        #delete it afterwards
        created_temp_paths.append(reference_seqs_fp)

        #build blast db of reference, otherwise ChimeraSlayer will do it
        #and parallel jobs clash
        _, db_files_to_remove = \
             build_blast_db_from_fasta_path(reference_seqs_fp)
        created_temp_paths.extend(db_files_to_remove)

        #make the index file globally
        #Reason: ChimeraSlayer first checks to see if the index file is there.
        #If not it tries to create it. This can lead to race condition if several
        # parallel jobs try to create it at the same time.
        make_cidx_file(aligned_reference_seqs_fp)
        created_temp_paths.append(aligned_reference_seqs_fp+".cidx")

    # compute the number of sequences that should be included in
    # each file after splitting the input fasta file   
    num_seqs_per_file = compute_seqs_per_file(input_fasta_fp,jobs_to_start)
     
    # split the fasta files and get the list of resulting files
    tmp_fasta_fps =\
      split_fasta(open(input_fasta_fp),num_seqs_per_file,\
      job_prefix,working_dir=output_dir)
    created_temp_paths += tmp_fasta_fps
    
    # build the filepath for the 'jobs script'
    jobs_fp = '%s/%sjobs.txt' % (output_dir, job_prefix)
    created_temp_paths.append(jobs_fp)
    
    # generate the list of commands to be pushed out to nodes and the list of
    # output files generated by each job
    commands, job_result_filepaths = \
     get_job_commands(python_exe_fp, identify_chimeric_seqs_fp, tmp_fasta_fps,
                      output_dir, reference_seqs_fp, job_prefix, working_dir,
                      aligned_reference_seqs_fp, blast_db, chimera_detection_method,
                      min_div_ratio, num_fragments, taxonomy_depth, max_e_value,
                      id_to_taxonomy_fp)

    created_temp_paths += job_result_filepaths

    # Set up poller apparatus if the user does not suppress polling
    if not suppress_polling:
        # Write the list of files which must exist for the jobs to be 
        # considered complete
        expected_files_filepath = '%s/expected_out_files.txt' % working_dir
        write_filepaths_to_file(job_result_filepaths,expected_files_filepath)
        created_temp_paths.append(expected_files_filepath)
        
        # Write the mapping file which described how the output files from
        # each job should be merged into the final output files
        merge_map_filepath = '%s/merge_map.txt' % working_dir
        process_run_results_f =\
         'qiime.parallel.poller.basic_process_run_results_f'
        write_merge_map_file_identify_chimeric_seqs(job_result_filepaths,
                                                    output_dir,
                                                    merge_map_filepath,
                                                    input_file_basename,
                                                    out_fp=output_fp)
        created_temp_paths.append(merge_map_filepath)
        
        # Create the filepath listing the temporary files to be deleted,
        # but don't write it yet
        deletion_list_filepath = '%s/deletion_list.txt' % working_dir
        created_temp_paths.append(deletion_list_filepath)
        
        # Generate the command to run the poller, and the list of temp files
        # created by the poller
        if not poll_directly:
            poller_command, poller_result_filepaths =\
             get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
             merge_map_filepath,deletion_list_filepath,process_run_results_f,\
             seconds_to_sleep=seconds_to_sleep)
            created_temp_paths += poller_result_filepaths
            # append the poller command to the list of job commands
            commands.append(poller_command)
        else:
            poller_command, poller_result_filepaths =\
             get_poller_command(python_exe_fp,poller_fp,expected_files_filepath,\
             merge_map_filepath,deletion_list_filepath,process_run_results_f,\
             seconds_to_sleep=seconds_to_sleep,command_prefix='',command_suffix='')
            created_temp_paths += poller_result_filepaths
        
        if not retain_temp_files:
            # If the user wants temp files deleted, now write the list of 
            # temp files to be deleted
            write_filepaths_to_file(created_temp_paths,deletion_list_filepath)
        else:
            # Otherwise just write an empty file
            write_filepaths_to_file([],deletion_list_filepath)
     
    # write the commands to the 'jobs files'
    write_jobs_file(commands,job_prefix=job_prefix,jobs_fp=jobs_fp)
    
    # submit the jobs file using cluster_jobs, if not suppressed by the
    # user
    if not opts.suppress_submit_jobs:
        submit_jobs(cluster_jobs_fp,jobs_fp,job_prefix)
        
    if poll_directly:
        try:
            check_call(poller_command.split())
        except CalledProcessError, e:
            print '**Error occuring when calling the poller directly. '+\
            'Jobs may have been submitted, but are not being polled.'
            print str(e)
            exit(-1)

if __name__ == "__main__":
    main()