File: io_utils.py

package info (click to toggle)
dnapi 1.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 42,284 kB
  • sloc: python: 831; sh: 62; makefile: 13
file content (66 lines) | stat: -rw-r--r-- 1,571 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""Functions for IO processes.

"""

import io
import bz2
import gzip
import zipfile
import tarfile
import os.path
import fileinput


def get_file_obj(in_file):
    """Return a file object from an input file.

    """
    if not os.path.exists(in_file) and in_file != "-":
        raise Exception("can't open {}".format(in_file))

    if in_file.find(".tar") > 0:
        if in_file.endswith(".tar.gz"):
            tp = tarfile.open(in_file, "r:gz")
        elif in_file.endswith(".tar"):
            tp = tarfile.open(in_file, "r")
        elif in_file.endswith(".tar.bz2"):
            tp = tarfile.open(in_file, "r:bz2")
        return io.TextIOWrapper(tp)
    elif in_file.endswith(".gz"):
        return gzip.open(in_file, "rt")
    elif in_file.endswith(".zip"):
        zobj = zipfile.ZipFile(in_file)
        zp = zobj.open(zobj.namelist()[0], "r")
        return io.TextIOWrapper(zp)
    elif in_file.endswith(".bz") or in_file.endswith(".bz2"):
        return bz2.BZ2File(in_file, "rt")
    else:
        return fileinput.input(in_file)


def fastq_sequence(fobj):
    """Return sequence lines in FASTQ.

    """
    for i, x in enumerate(fobj):
        if i % 4 == 1: yield x.rstrip()


def fastq_quality(fobj):
    """Return quality score lines in FASTQ.

    """
    for i, x in enumerate(fobj):
        if i % 4 == 3: yield x.rstrip()


def fastq_record(fobj):
    """Return sets of read records in FASTQ.

    """
    record = ""
    for i, x in enumerate(fobj):
        record += x
        if i % 4 == 3:
            yield record
            record = ""