1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
|
import multiprocessing
import os
from glob import glob
from math import ceil
from pathlib import Path
from typing import List, Optional, Set, Tuple, Iterator
from progressbar import RotatingMarker, ProgressBar, SimpleProgress, Bar, Percentage, ETA
from logging import Logger
from ont_fast5_api.fast5_interface import get_fast5_file
from ont_fast5_api.multi_fast5 import MultiFast5File
from ont_fast5_api.fast5_read import Fast5Read
READS_PER_FILE = 4000
FILENAME_BASE = "batch"
def batcher(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
def yield_fast5_files(input_path, recursive, follow_symlinks=True):
"""
Yield fast5 file paths within a given directory.
Optionally search recursively and follow symbolic links
:param input_path: Path
:param recursive: bool
:param follow_symlinks: bool
:return:
"""
if os.path.isfile(input_path):
yield input_path
return
if recursive:
for root, _, filenames in os.walk(input_path, followlinks=follow_symlinks):
for filename in filenames:
if not filename.endswith('.fast5'): continue
yield os.path.join(root, filename)
else:
for filename in glob(os.path.join(input_path, '*.fast5')):
yield filename
return
def yield_fast5_reads(input_path, recursive, follow_symlinks=True, read_ids=None):
"""
Iterate over reads in fast5 files and yield read_ids and fast5 read objects.
If read_id_set is defined, skip reads which are not in this set/list. An empty set/list returns all.
:param input_path: Path
:param recursive: bool
:param follow_symlinks: bool
:param read_ids: set or list
:raise TypeError: read_id_set must be of type set or list'
:return: yielded tuple (read_id, fast5_read_object)
"""
if not isinstance(read_ids, (list, set)) and read_ids is not None:
raise TypeError('read_ids must be of type set or list or none')
if read_ids and isinstance(read_ids, list):
read_ids = set(read_ids)
for fast5_path in yield_fast5_files(input_path=input_path, recursive=recursive, follow_symlinks=follow_symlinks):
fast5_file = get_fast5_file(fast5_path)
if read_ids:
selected_reads = read_ids.intersection(fast5_file.get_read_ids())
else:
selected_reads = fast5_file.get_read_ids()
for read_id in selected_reads:
yield read_id, fast5_file.get_read(read_id)
def get_fast5_file_list(input_path, recursive, follow_symlinks=True):
# NB this method is provided for compatibility with use cases where
# generator behaviour is not appropriate.
# E.g. where we need to know the file_list length or be able to sample from it
return list(yield_fast5_files(input_path, recursive, follow_symlinks))
def get_progress_bar(num_reads):
bar_format = [RotatingMarker(), " ", SimpleProgress(), Bar(), Percentage(), " ", ETA()]
progress_bar = ProgressBar(maxval=num_reads, widgets=bar_format)
bad_progressbar_version = False
try:
progress_bar.currval
except AttributeError as e:
bad_progressbar_version = True
pass
if bad_progressbar_version:
raise RuntimeError('Wrong progressbar package detected, likely '
'"progressbar2". Please uninstall that package and '
'install "progressbar" instead.')
return progress_bar.start()
class Fast5FilterWorker:
"""
Common worker for fast5_subset and demux_fast5
Extract reads in [read_set] from fast5 files in [input_file_list] to [output_dir]
Implements single process and multi process (multiprocessing.Pool object must be provided as argument to run_batch
to run in multi process mode)
Max number of output files is calculated from length of read_list divided by batch_size
Every worker receives a single input and single output file (and full set of read_ids to extract)
If input file is not exhausted before output file has reached batch_size limit, it is given to next worker.
"""
def __init__(
self,
input_file_list: List[Path],
output_dir: Path,
read_set: Set[str],
progressbar: ProgressBar,
logger: Logger,
filename_base: str="batch",
batch_size: int=READS_PER_FILE,
target_compression: Optional[str]=None,
):
self.input_f5s = input_file_list.copy() # this list will be modified to track progress
self.read_set = read_set
self.target_compression = target_compression
self.batch_size = batch_size
self.filename_base = filename_base
self.output_dir = output_dir
self.pbar = progressbar
self.logger = logger
self.filename_mapping_file = self.output_dir / "filename_mapping.txt"
if self.filename_mapping_file.exists():
self.logger.info("overwriting filename mapping file {}".format(self.filename_mapping_file))
self.filename_mapping_file.unlink()
self.out_files = {} # dict where key=filename value=read_set
self.available_out_files = []
self.populate_out_files()
self.tasks = []
self.pool = None
def populate_out_files(self) -> None:
"""
Calculate number of output files based on batch size.
Delete existing files and initialise dict that keeps track of reads added to each file
:return:
"""
num_outputs = int(ceil(len(self.read_set) / float(self.batch_size)))
for i in range(num_outputs):
filename = self.filename_base + str(i) + ".fast5"
output_file_name = self.output_dir / filename
if output_file_name.exists():
self.logger.info("overwriting multiread file {}".format(output_file_name))
output_file_name.unlink()
self.out_files[output_file_name] = set()
# reversing so that first item to be popped is lower idx
self.available_out_files = sorted(self.out_files.keys(), reverse=True)
def run_batch(self, pool: multiprocessing.Pool=None) -> None:
"""
Choose sync or async (if multiprocessing.Pool is provided) running mode, launch tasks.
:param pool:
:return:
"""
if pool is None:
self._launch_sync_tasks()
else:
self.pool = pool
self._launch_async_tasks()
def _launch_sync_tasks(self) -> None:
"""
Run tasks sequentially
:return:
"""
for args_tuple in self._args_generator():
reads, out_file, in_file = extract_selected_reads(*args_tuple)
self._update_file_lists(reads=reads, out_file=out_file, in_file=in_file)
def _launch_async_tasks(self) -> None:
"""
Launch an async task for every input-output pair
self.tasks is just for keeping track of number of tasks still running
:return:
"""
for args_tuple in self._args_generator():
self.pool.apply_async(func=extract_selected_reads, args=args_tuple,
callback=self._callback, error_callback=self._error_callback)
self.tasks.append(0)
def _callback(self, result) -> None:
"""
Once a thread finishes, decrement self.tasks, update available files and reads and trigger scan for new tasks
:param result: tuple
:return:
"""
self._update_file_lists(*result)
self._launch_async_tasks()
self.tasks.pop()
def _error_callback(self, result) -> None:
self.logger.error(result.original_exception)
self._update_file_lists(set(), result.output_file, None)
self._launch_async_tasks()
self.tasks.pop()
def _update_file_lists(self, reads, out_file, in_file) -> None:
"""
Update read sets and files available for processing
Update progressbar
:param reads:
:param out_file:
:param in_file:
:return:
"""
in_file_exhausted = 1
if in_file is not None:
# in_file was not exhausted
in_file_exhausted = 0
self.input_f5s.append(in_file)
self.out_files[out_file].update(reads)
self.read_set.difference_update(reads)
if len(self.out_files[out_file]) < self.batch_size:
# out_file has not reached batch limit
self.available_out_files.append(out_file)
# print filename - read table
with open(str(self.filename_mapping_file), 'a') as output_table:
for read in reads:
output_table.write("{}\t{}\n".format(read, out_file.name))
# increment progressbar by number of reads found and by number of files processed
self.pbar.update(self.pbar.currval + len(reads) + in_file_exhausted)
def _args_generator(self) -> Iterator[Tuple[Path, Path, Set[str], int, Optional[str]]]:
"""
If there are possible pairs of input and output files, yield tuples that are suitable inputs to
extract_selected_reads
:return: a generator of tuples(in_file, out_file, read_set, count, compression)
"""
while self.available_out_files and self.input_f5s:
out_file = self.available_out_files.pop()
in_file = self.input_f5s.pop()
count = self.batch_size - len(self.out_files[out_file])
yield in_file, out_file, self.read_set, count, self.target_compression
def extract_selected_reads(
input_file: Path,
output_file: Path,
read_set: Set[str],
count: int,
target_compression: Optional[str]=None,
) -> Tuple[Set[str], Path, Optional[Path]]:
"""
Take reads from input file if read_id id is in read_set
Write to output file, at most count times
return tuple (found_reads, output_file, input_file)
If input file was exhausted, the third item in return is None
:param input_file: Path to input Fast5 file
:param output_file: Path to output Fast5 file (will be appended to if already exists)
:param read_set: set of read_ids to extract
:param count: int max number of reads to be added to output_file
:param target_compression: str type of compression for output fast5 file
:return: tuple of: found read set, path to output file, path to input file (if not exhausted)
"""
try:
found_reads = set()
with MultiFast5File(str(output_file), 'a') as output_f5:
reads_present = set(output_f5.get_read_ids())
for read_id, read in read_generator(input_file, read_set):
found_reads.add(read_id)
if read_id in reads_present:
continue
output_f5.add_existing_read(read, target_compression=target_compression)
reads_present.add(read_id)
if len(found_reads) >= count:
return found_reads, output_file, input_file
except Exception as e:
exception = type(e)("Error processing file {}: {}".format(input_file,
e.args))
raise ExtractionException(exception, output_file)
return found_reads, output_file, None
class ExtractionException(Exception):
def __init__(self, original_exception, output_file):
self.original_exception = original_exception
self.output_file = output_file
def read_generator(input_file: Path, read_set: Set[str]) -> Iterator[Tuple[str, Fast5Read]]:
"""
Open input_file as Fast5, yield tuples (read_id, fast5_read) for every read_id that is present in read_set
:param input_file: Path to input Fast5File
:param read_set: set of read_ids to look for
:return: tuples of (read_id, read object)
"""
with get_fast5_file(str(input_file)) as input_f5:
read_ids = input_f5.get_read_ids()
for read_id in read_set.intersection(read_ids):
read = input_f5.get_read(read_id)
yield read_id, read
|