File: validate_input.py

package info (click to toggle)
pigx-rnaseq 0.1.1-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 19,080 kB
  • sloc: python: 599; makefile: 285; sh: 229; lisp: 11
file content (107 lines) | stat: -rw-r--r-- 6,012 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import csv
import yaml
import argparse
from glob import glob
import itertools

def read_sample_sheet(path):
    fp = open(path, "r")
    data = csv.reader(fp)
    rows = [row for row in data if ''.join(row).strip()]
    header = rows[0]; rows = rows[1:]
    sample_sheet = [dict(zip(header, row)) for row in rows]
    return sample_sheet

def read_config_file(path):
    with open(path, 'rt') as infile:
        config = yaml.load(infile)
    return config

def validate_memory_restrictions(config):
    for rule in config['execution']['rules']:
        if 'memory' in config['execution']['rules'][rule]:
            value = config['execution']['rules'][rule]['memory']
            if not type(value) == int and not value.isdigit():
                raise Exception("ERROR: memory limits must be expressed as a plain number of megabytes.  Got '{}' in '{}'.".format(value, rule))


def validate_config(config):
    # Check that all locations exist
    for loc in config['locations']:
        if (not loc == 'output-dir') and (not (os.path.isdir(config['locations'][loc]) or os.path.isfile(config['locations'][loc]))):
            raise Exception("ERROR: The following necessary directory/file does not exist: '{}' ({})".format(config['locations'][loc], loc))
        if not loc == 'output-dir':
            if config['locations'][loc].endswith(".gz") or config['locations'][loc].endswith(".bz2") or config['locations'][loc].endswith(".xz"):
                raise Exception("ERROR: The {} file '{}' is referenced in its compressed form like it was downloaded. However, the tools of this workflow expects the reference as plain FASTA/GTF files that are directly readable. Please unpack these files and update the settings to the respective new filename. This does _not_ hold for the FASTQ.gz files of the samples which shall remain compressed.".format(loc,config['locations'][loc]))

    validate_memory_restrictions(config)

    sample_sheet = read_sample_sheet(config['locations']['sample-sheet'])
    
    # Check if the required fields are found in the sample sheet
    required_fields = ['name', 'reads', 'reads2', 'sample_type']
    
    if 'DEanalyses' in config:
        # also add the list of covariates from the DE analyses if available
        covariates = [config['DEanalyses'][x]['covariates'] for x in config['DEanalyses'].keys() if 'covariates' in config['DEanalyses'][x]]
        # cleanup and get the set of unique covariates
        covariates = [y.strip() for y  in itertools.chain(*[x.split(',') for x in covariates])]
        # remove empty strings
        covariates = [x for x in covariates if x]
        required_fields = required_fields + covariates
    required_fields = set(required_fields)
    not_found = required_fields.difference(set(sample_sheet[0].keys()))
    if len(not_found) > 0:
        raise Exception("ERROR: Required field(s) {} could not be found in the sample sheet file '{}'".format(not_found, config['locations']['sample-sheet']))

    # Check that requested analyses make sense
    if 'DEanalyses' in config:
        for analysis in config['DEanalyses']:
            for group in config['DEanalyses'][analysis]['case_sample_groups'] .split(',') + config['DEanalyses'][analysis]['control_sample_groups'].split(','):
                group = group.strip() #remove any leading/trailing whitespaces in the sample group names
                if not any(row['sample_type'] == group for row in sample_sheet):
                    raise Exception("ERROR: no samples in sample sheet have sample type '{}', specified in analysis {}.".format(group, analysis))

    # Check that reads files exist; sample names are unique to each row; 
    samples = {}      
    row_index = 1
    for row in sample_sheet:
        sample = row['name']
        if sample in samples:
            raise Exception('ERROR: name "{}" is not unique. Replace it with a unique name in the sample_sheet.'.format(sample))
        else:
            samples[sample] = 1
        filenames = [row['reads'], row['reads2']] if row['reads2'] else [row['reads']]
        # don't allow paths in the sample sheet, only allow the base name of the file
        if sum([os.path.basename(x) != x for x in filenames]) > 0:
            raise Exception(" ".join(["ERROR: read file names in the sample sheet must be basenames", 
            "not paths to the files. See sample ", sample, "in the sample sheet"]))
        for filename in filenames:
            fullpath = os.path.join(config['locations']['reads-dir'], filename)
            if not os.path.isfile(fullpath):
                filenameFlankedWithWhitespace = filename.startswith(' ')  or filename.endswith(' ') or filename.startswith('\t') or filename.endswith('\t')
                if filenameFlankedWithWhitespace:
                    raise Exception("ERROR: missing reads file: '{}', likely caused by blanks flanking the filename, please correct.".format(fullpath))
                else:
                    raise Exception("ERROR: missing reads file: '{}'".format(fullpath))
        # check if some of the columns contain missing information
        fields = ['name', 'reads', 'sample_type'] # fields for which missing info is not allowed
        missing = [f for f in fields if not row[f].strip()]
        if len(missing) > 0:
            raise Exception("".join(["ERROR: Missing information in sample sheet at row #",str(row_index),
                    ". Missing info is not allowed for name/reads/sample_type fields"]))
        row_index = row_index + 1
        
            
    

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-c', '--config-file', required=True, help='Path of configuration file [settings.yaml]')
    parser.add_argument('-s', '--sample-sheet-file', required=True, help='Path of sample sheet [sample_sheet.csv]')
    args = parser.parse_args()

    config = read_config_file(args.config_file)
    config['locations']['sample-sheet'] = args.sample_sheet_file
    validate_config(config)