1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
import io
from csvkit.convert import fixed
from csvkit.utilities.in2csv import In2CSV
from tests.utils import CSVKitTestCase, stdin_as_string
class TestFixed(CSVKitTestCase):
Utility = In2CSV
def test_fixed(self):
with open('examples/testfixed') as f, open('examples/testfixed_schema.csv') as schema:
output = fixed.fixed2csv(f, schema)
with open('examples/testfixed_converted.csv') as f:
self.assertEqual(f.read(), output)
def test_fixed_skip_lines(self):
with open('examples/testfixed_skip_lines') as f, open('examples/testfixed_schema.csv') as schema:
output = fixed.fixed2csv(f, schema, skip_lines=3)
with open('examples/testfixed_converted.csv') as f:
self.assertEqual(f.read(), output)
def test_fixed_no_inference(self):
input_file = io.BytesIO(b' 1 2 3')
with stdin_as_string(input_file):
self.assertLines(['--no-inference', '-f', 'fixed', '--schema',
'examples/testfixed_schema_no_inference.csv'], [
'a,b,c',
'1,2,3',
])
input_file.close()
def test_fixed_streaming(self):
with open('examples/testfixed') as f, open('examples/testfixed_schema.csv') as schema:
output_file = io.StringIO()
fixed.fixed2csv(f, schema, output=output_file)
output = output_file.getvalue()
output_file.close()
with open('examples/testfixed_converted.csv') as f:
self.assertEqual(f.read(), output)
def test_schema_decoder_init(self):
rd = fixed.SchemaDecoder(['column', 'start', 'length'])
self.assertEqual(1, rd.start)
self.assertEqual(2, rd.length)
self.assertEqual(0, rd.column)
def test_schema_decoder_in_action(self):
rd = fixed.SchemaDecoder(['comment', 'start', 'length', 'column'])
(column, start, length) = rd(['This is a comment', '0', '1', 'column_name'])
self.assertEqual(False, rd.one_based)
self.assertEqual('column_name', column)
self.assertEqual(0, start)
self.assertEqual(1, length)
(column, start, length) = rd(['This is another comment', '1', '5', 'column_name2'])
self.assertEqual(False, rd.one_based)
self.assertEqual('column_name2', column)
self.assertEqual(1, start)
self.assertEqual(5, length)
(column, start, length) = rd(['yet another comment', '9', '14', 'column_name3'])
self.assertEqual(False, rd.one_based)
self.assertEqual('column_name3', column)
self.assertEqual(9, start)
self.assertEqual(14, length)
def test_one_based_row_decoder(self):
rd = fixed.SchemaDecoder(['column', 'start', 'length'])
(column, start, length) = rd(['LABEL', '1', '5'])
self.assertEqual(True, rd.one_based)
self.assertEqual('LABEL', column)
self.assertEqual(0, start)
self.assertEqual(5, length)
(column, start, length) = rd(['LABEL2', '6', '15'])
self.assertEqual('LABEL2', column)
self.assertEqual(5, start)
self.assertEqual(15, length)
def test_schematic_line_parser(self):
schema = """column,start,length
foo,1,5
bar,6,2
baz,8,5"""
f = io.StringIO(schema)
parser = fixed.FixedWidthRowParser(f)
f.close()
self.assertEqual('foo', parser.headers[0])
self.assertEqual('bar', parser.headers[1])
self.assertEqual('baz', parser.headers[2])
parsed = parser.parse("111112233333")
self.assertEqual('11111', parsed[0])
self.assertEqual('22', parsed[1])
self.assertEqual('33333', parsed[2])
parsed = parser.parse(" 1 2 3")
self.assertEqual('1', parsed[0])
self.assertEqual('2', parsed[1])
self.assertEqual('3', parsed[2])
parsed = parser.parse("1 1 233 3")
self.assertEqual('1 1', parsed[0])
self.assertEqual('2', parsed[1])
self.assertEqual('33 3', parsed[2])
|