File: test_sources.py

package info (click to toggle)
python-petl 1.7.17-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,224 kB
  • sloc: python: 22,617; makefile: 109; xml: 9
file content (161 lines) | stat: -rw-r--r-- 4,025 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division


import gzip
import bz2
import zipfile
from tempfile import NamedTemporaryFile
from petl.compat import PY2


from petl.test.helpers import ieq, eq_
import petl as etl
from petl.io.sources import MemorySource, PopenSource, ZipSource, \
    StdoutSource, GzipSource, BZ2Source


def test_memorysource():
    tbl1 = (('foo', 'bar'),
            ('a', '1'),
            ('b', '2'),
            ('c', '2'))

    # test writing to a string buffer
    ss = MemorySource()
    etl.tocsv(tbl1, ss)
    expect = "foo,bar\r\na,1\r\nb,2\r\nc,2\r\n"
    if not PY2:
        expect = expect.encode('ascii')
    actual = ss.getvalue()
    eq_(expect, actual)

    # test reading from a string buffer
    tbl2 = etl.fromcsv(MemorySource(actual))
    ieq(tbl1, tbl2)
    ieq(tbl1, tbl2)

    # test appending
    etl.appendcsv(tbl1, ss)
    actual = ss.getvalue()
    expect = "foo,bar\r\na,1\r\nb,2\r\nc,2\r\na,1\r\nb,2\r\nc,2\r\n"
    if not PY2:
        expect = expect.encode('ascii')
    eq_(expect, actual)


def test_memorysource_2():

    data = 'foo,bar\r\na,1\r\nb,2\r\nc,2\r\n'
    if not PY2:
        data = data.encode('ascii')
    actual = etl.fromcsv(MemorySource(data))
    expect = (('foo', 'bar'),
              ('a', '1'),
              ('b', '2'),
              ('c', '2'))
    ieq(expect, actual)
    ieq(expect, actual)


def test_popensource():

    expect = (('foo', 'bar'),)
    delimiter = ' '
    actual = etl.fromcsv(PopenSource(r'echo foo bar',
                                     shell=True),
                         delimiter=delimiter)
    ieq(expect, actual)


def test_zipsource():

    # setup
    tbl = [('foo', 'bar'), ('a', '1'), ('b', '2')]
    fn_tsv = NamedTemporaryFile().name
    etl.totsv(tbl, fn_tsv)
    fn_zip = NamedTemporaryFile().name
    z = zipfile.ZipFile(fn_zip, mode='w')
    z.write(fn_tsv, 'data.tsv')
    z.close()

    # test
    actual = etl.fromtsv(ZipSource(fn_zip, 'data.tsv'))
    ieq(tbl, actual)


def test_stdoutsource():

    tbl = [('foo', 'bar'), ('a', 1), ('b', 2)]
    etl.tocsv(tbl, StdoutSource(), encoding='ascii')
    etl.tohtml(tbl, StdoutSource(), encoding='ascii')
    etl.topickle(tbl, StdoutSource())


def test_stdoutsource_none(capfd):

    tbl = [('foo', 'bar'), ('a', 1), ('b', 2)]
    etl.tocsv(tbl, encoding='ascii')
    captured = capfd.readouterr()
    outp = captured.out
    # TODO: capfd works on vscode but not in console/tox
    if outp:
        assert outp in ( 'foo,bar\r\na,1\r\nb,2\r\n' , 'foo,bar\na,1\nb,2\n' )


def test_stdoutsource_unicode():

    tbl = [('foo', 'bar'),
           (u'Արամ Խաչատրյան', 1),
           (u'Johann Strauß', 2)]
    etl.tocsv(tbl, StdoutSource(), encoding='utf-8')
    etl.tohtml(tbl, StdoutSource(), encoding='utf-8')
    etl.topickle(tbl, StdoutSource())


def test_gzipsource():

    # setup
    tbl = [('foo', 'bar'), ('a', '1'), ('b', '2')]
    fn = NamedTemporaryFile().name + '.gz'
    expect = b"foo,bar\na,1\nb,2\n"

    # write explicit
    etl.tocsv(tbl, GzipSource(fn), lineterminator='\n')
    actual = gzip.open(fn).read()
    eq_(expect, actual)
    # write implicit
    etl.tocsv(tbl, fn, lineterminator='\n')
    actual = gzip.open(fn).read()
    eq_(expect, actual)

    # read explicit
    tbl2 = etl.fromcsv(GzipSource(fn))
    ieq(tbl, tbl2)
    # read implicit
    tbl2 = etl.fromcsv(fn)
    ieq(tbl, tbl2)


def test_bzip2source():

    # setup
    tbl = [('foo', 'bar'), ('a', '1'), ('b', '2')]
    fn = NamedTemporaryFile().name + '.bz2'
    expect = b"foo,bar\na,1\nb,2\n"

    # write explicit
    etl.tocsv(tbl, BZ2Source(fn), lineterminator='\n')
    actual = bz2.BZ2File(fn).read()
    eq_(expect, actual)
    # write implicit
    etl.tocsv(tbl, fn, lineterminator='\n')
    actual = bz2.BZ2File(fn).read()
    eq_(expect, actual)

    # read explicit
    tbl2 = etl.fromcsv(BZ2Source(fn))
    ieq(tbl, tbl2)
    # read implicit
    tbl2 = etl.fromcsv(fn)
    ieq(tbl, tbl2)