File: test_fixed.py

package info (click to toggle)
csvkit 2.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 40,664 kB
  • sloc: python: 4,924; perl: 1,000; makefile: 131; sql: 4
file content (115 lines) | stat: -rw-r--r-- 4,081 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import io

from csvkit.convert import fixed
from csvkit.utilities.in2csv import In2CSV
from tests.utils import CSVKitTestCase, stdin_as_string


class TestFixed(CSVKitTestCase):
    Utility = In2CSV

    def test_fixed(self):
        with open('examples/testfixed') as f, open('examples/testfixed_schema.csv') as schema:
            output = fixed.fixed2csv(f, schema)

        with open('examples/testfixed_converted.csv') as f:
            self.assertEqual(f.read(), output)

    def test_fixed_skip_lines(self):
        with open('examples/testfixed_skip_lines') as f, open('examples/testfixed_schema.csv') as schema:
            output = fixed.fixed2csv(f, schema, skip_lines=3)

        with open('examples/testfixed_converted.csv') as f:
            self.assertEqual(f.read(), output)

    def test_fixed_no_inference(self):
        input_file = io.BytesIO(b'     1   2 3')

        with stdin_as_string(input_file):
            self.assertLines(['--no-inference', '-f', 'fixed', '--schema',
                              'examples/testfixed_schema_no_inference.csv'], [
                'a,b,c',
                '1,2,3',
            ])

        input_file.close()

    def test_fixed_streaming(self):
        with open('examples/testfixed') as f, open('examples/testfixed_schema.csv') as schema:
            output_file = io.StringIO()
            fixed.fixed2csv(f, schema, output=output_file)
            output = output_file.getvalue()
            output_file.close()

        with open('examples/testfixed_converted.csv') as f:
            self.assertEqual(f.read(), output)

    def test_schema_decoder_init(self):
        rd = fixed.SchemaDecoder(['column', 'start', 'length'])
        self.assertEqual(1, rd.start)
        self.assertEqual(2, rd.length)
        self.assertEqual(0, rd.column)

    def test_schema_decoder_in_action(self):
        rd = fixed.SchemaDecoder(['comment', 'start', 'length', 'column'])

        (column, start, length) = rd(['This is a comment', '0', '1', 'column_name'])
        self.assertEqual(False, rd.one_based)
        self.assertEqual('column_name', column)
        self.assertEqual(0, start)
        self.assertEqual(1, length)

        (column, start, length) = rd(['This is another comment', '1', '5', 'column_name2'])
        self.assertEqual(False, rd.one_based)
        self.assertEqual('column_name2', column)
        self.assertEqual(1, start)
        self.assertEqual(5, length)

        (column, start, length) = rd(['yet another comment', '9', '14', 'column_name3'])
        self.assertEqual(False, rd.one_based)
        self.assertEqual('column_name3', column)
        self.assertEqual(9, start)
        self.assertEqual(14, length)

    def test_one_based_row_decoder(self):
        rd = fixed.SchemaDecoder(['column', 'start', 'length'])

        (column, start, length) = rd(['LABEL', '1', '5'])
        self.assertEqual(True, rd.one_based)
        self.assertEqual('LABEL', column)
        self.assertEqual(0, start)
        self.assertEqual(5, length)

        (column, start, length) = rd(['LABEL2', '6', '15'])
        self.assertEqual('LABEL2', column)
        self.assertEqual(5, start)
        self.assertEqual(15, length)

    def test_schematic_line_parser(self):
        schema = """column,start,length
foo,1,5
bar,6,2
baz,8,5"""

        f = io.StringIO(schema)
        parser = fixed.FixedWidthRowParser(f)
        f.close()

        self.assertEqual('foo', parser.headers[0])
        self.assertEqual('bar', parser.headers[1])
        self.assertEqual('baz', parser.headers[2])

        parsed = parser.parse("111112233333")
        self.assertEqual('11111', parsed[0])
        self.assertEqual('22', parsed[1])
        self.assertEqual('33333', parsed[2])

        parsed = parser.parse("    1 2    3")
        self.assertEqual('1', parsed[0])
        self.assertEqual('2', parsed[1])
        self.assertEqual('3', parsed[2])

        parsed = parser.parse("1  1  233  3")
        self.assertEqual('1  1', parsed[0])
        self.assertEqual('2', parsed[1])
        self.assertEqual('33  3', parsed[2])