1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
|
#!/usr/bin/env python3
"""
Modify FASTA and FASTQ files by picking subsets and modifying individual entries.
Possible modifications:
- Pick a subset of records (given by name). With lots of names, this is faster
than 'grep -A 3 --no-group-separator -f readnames.txt file.fastq' magic, which
may be used with FASTQ files.
If the record name ends in '/1' or '/2', these two charecter are ignored when
comparing to the names in the file.
- Trim low-quality ends
- Trim reads to a given length
- Discard reads shorter than a given length
- Discard reads in which the expected number of errors exceeds a threshold
- Discard reads that contain characters other than those in a given set.
- Reverse-complement each read
- Make sequence characters upper- or lowercase
- Convert from FASTA to FASTQ by assigning a fixed quality value to all bases
- Convert from FASTQ to FASTA by dropping all quality values
- Make read names unique
- Pick only the first N sequences.
Modifications are done in the order in which they are listed above.
The result is written to standard output.
The algorithm for quality trimming is the same as the one used by BWA:
- Subtract the cutoff value from all qualities.
- Compute partial sums from all indices to the end of the sequence.
- Trim sequence at the index at which the sum is minimal.
"""
import sys
import errno
from collections import defaultdict
from itertools import islice
import random
from sqt import HelpfulArgumentParser, SequenceReader, FastaWriter, FastqWriter
from sqt.dna import reverse_complement, mutate
from sqt.qualtrim import quality_trim_index as trim_index, expected_errors
__author__ = "Marcel Martin"
def add_arguments(parser):
arg = parser.add_argument
arg('--names', metavar='FILE', default=None,
help='Keep only records whose name occurs in FILE (one per line)')
arg('--not-names', metavar='FILE', default=None,
help='Discard records whose name occurs in FILE (one per line)')
arg("-q", "--cutoff", type=int, default=None,
help="Quality cutoff. Only when input format is FASTQ")
arg('--substitute', type=float, default=0, metavar='PROB',
help='Randomly substitute bases at probability PROB. Default: %(default)s')
arg("--length", "-l", type=int, default=None,
help="Shorten records to LENGTH (default: do not shorten)")
arg('-m', '--minimum-length', type=int, default=0, metavar='LENGTH',
help='Discard reads shorter than LENGTH')
arg('--max-errors', type=float, default=None, metavar='ERRORS',
help='Discard reads whose expected number of errors (computed '
'from quality values) exceeds ERRORS.')
arg('--allowed-characters', default=None, metavar='CHARS', help='Discard '
'reads that contain characters other than those in CHARS. CHARS is '
'case-sensitive. Example: -c ACGTacgt.')
arg('--reverse-complement', '-r', action='store_true',
default=False, help='Reverse-complement each sequence')
group = parser.add_mutually_exclusive_group()
group.add_argument('--upper', dest='character_case', action='store_const',
default=None, const='upper', help='Convert sequence characters to uppercase')
group.add_argument('--lower', dest='character_case', action='store_const',
default=None, const='lower', help='Convert sequence characters to lowercase')
arg('--constant-quality', '-c', metavar='QUALITY', type=int, default=None,
help='Set all quality values to QUALITY. Use this to convert from '
'FASTA to FASTQ.')
arg('--fasta', default=False, action='store_true',
help='Always output FASTA (drop qualities if input is FASTQ)')
arg('--unique-names', action='store_true', default=False,
help="Make record names unique by appending _1, _2 etc. when necessary")
arg('--limit', '-n', type=int, metavar='N', default=None,
help="Pick only the first N sequences (default: all)")
arg("--width", "-w", type=int, default=80,
help="Characters per line in output FASTA (default: %(default)s). "
"Set to 0 to disallow line breaks entirely. This is ignored for FASTQ files.")
arg('--seed', type=int, default=None,
help='Set random seed for reproducible runs. Relevant when --substitution-rate is used.'
'(default: use different seed each run)')
arg('path', metavar='FASTA/FASTQ',
help='input FASTA or FASTQ file')
class ReadPicker:
def __init__(self, file_with_names, keep=True):
"""
keep -- If True, reads occurring in the file are kept. If False, they
are discarded.
"""
read_names = []
with open(file_with_names) as f:
read_names = f.read().split('\n')
self.read_names = { rn for rn in read_names if rn != '' }
self.keep = keep
def __call__(self, read):
rname = read.name.split(' ', maxsplit=1)[0]
if rname.endswith('/1'):
rname = rname[:-2]
elif rname.endswith('/2'):
rname = rname[:-2]
if rname in self.read_names:
return read if self.keep else None
else:
return None if self.keep else read
class QualityTrimmer:
def __init__(self, cutoff):
self.cutoff = cutoff
def __call__(self, read):
index = trim_index(read.qualities, self.cutoff)
return read[:index]
class Mutater:
def __init__(self, substitution_rate):
self.substitution_rate = substitution_rate
def __call__(self, read):
read.sequence = mutate(read.sequence, substitution_rate=self.substitution_rate, indel_rate=0)
return read
class Shortener:
def __init__(self, length):
self.length = length
def __call__(self, read):
return read[:self.length]
class MinimumLengthFilter:
def __init__(self, length):
self.minimum_length = length
def __call__(self, read):
if len(read) < self.minimum_length:
return None
else:
return read
class MaxExpectedErrorFilter:
"""
Discard reads whose expected number of errors, according to the quality
values, exceeds the given threshold.
The idea comes from usearch's -fastq_maxee parameter
(http://drive5.com/usearch/).
"""
def __init__(self, max_errors):
self.max_errors = max_errors
def __call__(self, read):
if expected_errors(read.qualities) > self.max_errors:
return None
else:
return read
class AllowedCharacterFilter:
"""
Discard reads that contain characters other than those in the given set.
"""
def __init__(self, allowed_characters):
self.allowed = set(allowed_characters)
def __call__(self, read):
if set(read.sequence) <= self.allowed:
return read
else:
return None
def reverse_complementer(read):
read.sequence = reverse_complement(read.sequence)
if read.qualities:
read.qualities = read.qualities[::-1]
return read
def lower_caser(read):
read.sequence = read.sequence.lower()
return read
def upper_caser(read):
read.sequence = read.sequence.upper()
return read
class QualitySetter:
def __init__(self, value):
self.quality_char = chr(33 + value)
def __call__(self, read):
read.qualities = self.quality_char * len(read)
return read
def quality_dropper(read):
read.qualities = None
return read
class UniqueNamer:
def __init__(self):
# Counter for occurrences of a name
self.names = defaultdict(int)
def __call__(self, read):
if ' ' in read.name:
name, description = read.name.split(' ', maxsplit=1)
else:
name = read.name
description = None
self.names[name] += 1
if self.names[name] == 1:
# Read not previously seen
return read
name = '{}_{}'.format(name, self.names[name] - 1)
read.name = name
if description is not None:
read.name += ' ' + description
return read
def main(args=None):
if args is None:
parser = HelpfulArgumentParser(description=__doc__)
add_arguments(parser)
args = parser.parse_args()
if args.width == 0:
args.width = None
if args.seed is not None:
random.seed(args.seed)
modifiers = []
if args.names:
modifiers.append(ReadPicker(args.names, keep=True))
if args.not_names:
modifiers.append(ReadPicker(args.not_names, keep=False))
if args.cutoff is not None:
modifiers.append(QualityTrimmer(args.cutoff))
if args.substitute > 0:
modifiers.append(Mutater(args.substitute))
if args.length:
modifiers.append(Shortener(args.length))
if args.minimum_length != 0:
modifiers.append(MinimumLengthFilter(args.minimum_length))
if args.max_errors is not None:
modifiers.append(MaxExpectedErrorFilter(args.max_errors))
if args.allowed_characters is not None:
modifiers.append(AllowedCharacterFilter(args.allowed_characters))
if args.reverse_complement:
modifiers.append(reverse_complementer)
if args.character_case == 'lower':
modifiers.append(lower_caser)
if args.character_case == 'upper':
modifiers.append(upper_caser)
if args.constant_quality is not None:
modifiers.append(QualitySetter(args.constant_quality))
if args.fasta:
modifiers.append(quality_dropper)
if args.unique_names:
modifiers.append(UniqueNamer())
with SequenceReader(args.path) as fr:
format = fr.format
outformat = format
if args.constant_quality is not None:
outformat = 'fastq'
if args.fasta:
outformat = 'fasta'
if outformat == 'fastq':
writer = FastqWriter(sys.stdout)
else:
writer = FastaWriter(sys.stdout, line_length=args.width)
try:
for record in islice(fr, 0, args.limit):
for modifier in modifiers:
record = modifier(record)
if record is None:
break
else:
# only executed if loop did not terminate via break
writer.write(record)
except IOError as e:
if e.errno != errno.EPIPE:
raise
if __name__ == '__main__':
main()
|