File: test_streaming.py

package info (click to toggle)
python-screed 1.1.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 820 kB
  • sloc: python: 3,356; makefile: 169; sh: 32; javascript: 16
file content (93 lines) | stat: -rw-r--r-- 2,087 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# Copyright (c) 2008-2015, Michigan State University

from __future__ import print_function
from __future__ import absolute_import

import tempfile
import os
import sys
import io
import threading
import subprocess

import pytest

import screed
from . import screed_tst_utils as utils
from . import test_fasta
from . import test_fastq
from screed.DBConstants import fileExtension


def streamer_reader(ifilename, exception):
    try:
        for read in screed.open(ifilename):
            pass
    except Exception as e:
        exception.append(e)


def streamer(ifilename):

    # Get temp filenames, etc.
    in_dir = tempfile.mkdtemp(prefix="screedtest_")
    fifo = os.path.join(in_dir, 'fifo')
    ifile = io.open(ifilename, 'rb')

    # make a fifo to simulate streaming
    os.mkfifo(fifo)

    exception = []
    # FIFOs MUST BE OPENED FOR READING BEFORE THEY ARE WRITTEN TO
    # If this isn't done, they will BLOCK and things will hang.
    # rvalues will hold the return from the threaded function
    thread = threading.Thread(target=streamer_reader, args=[fifo, exception])
    thread.start()

    fifofile = io.open(fifo, 'wb')
    # read binary to handle compressed files
    chunk = ifile.read(8192)
    while len(chunk) > 0:
        fifofile.write(chunk)
        chunk = ifile.read(8192)

    fifofile.close()

    thread.join()

    if len(exception) > 0:
        raise exception[0]


def test_stream_fa():
    streamer(utils.get_test_data('test.fa'))


def test_stream_fq():
    streamer(utils.get_test_data('test.fastq'))


@pytest.mark.xfail()
def test_stream_fa_gz():
    streamer(utils.get_test_data('test.fa.gz'))


def test_stream_gz_fail():
    try:
        streamer(utils.get_test_data('test.fastq.gz'))
        assert 0, "This should not work yet"
    except ValueError as err:
        print(str(err))


@pytest.mark.xfail()
def test_stream_fq_gz():
    streamer(utils.get_test_data('test.fastq.gz'))


def test_stream_fa_bz2():
    streamer(utils.get_test_data('test.fa.bz2'))


def test_stream_fq_bz2():
    streamer(utils.get_test_data('test.fastq.bz2'))