1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
"""
To test standard input (without piped data), run each of:
* csvclean
* csvcut -c 1
* csvformat
* csvgrep -c 1 -m d
* csvjson --no-inference --stream --snifflimit 0
* csvstack
* in2csv --format csv --no-inference --snifflimit 0
And paste:
"a","b","c"
"g","h","i"
"d","e","f"
"""
import io
import sys
import unittest
import warnings
from contextlib import contextmanager, redirect_stderr
from unittest.mock import patch
import agate
from csvkit.exceptions import ColumnIdentifierError, RequiredHeaderError
@contextmanager
def stderr_as_stdout():
temp = sys.stderr
sys.stderr = sys.stdout
yield
sys.stderr = temp
@contextmanager
def stdin_as_string(content):
temp = sys.stdin
sys.stdin = io.TextIOWrapper(io.BufferedReader(content))
yield
sys.stdin = temp
class CSVKitTestCase(unittest.TestCase):
warnings.filterwarnings(action='ignore', module='agate')
def get_output(self, args):
output_file = io.TextIOWrapper(io.BytesIO(), encoding='utf-8', newline='', write_through=True)
utility = self.Utility(args, output_file)
utility.run()
output = output_file.buffer.getvalue().decode('utf-8')
output_file.close()
return output
def get_output_as_io(self, args):
return io.StringIO(self.get_output(args))
def get_output_as_list(self, args):
return self.get_output(args).split('\n')
def get_output_as_reader(self, args):
return agate.csv.reader(self.get_output_as_io(args))
def assertError(self, launch_new_instance, options, message, args=None):
command = self.Utility.__name__.lower()
if args is None:
args = ['examples/dummy.csv']
f = io.StringIO()
with redirect_stderr(f):
with patch.object(sys, 'argv', [command] + options + args):
with self.assertRaises(SystemExit) as e:
launch_new_instance()
self.assertEqual(e.exception.code, 2)
self.assertEqual(f.getvalue().splitlines()[-1], f'{command}: error: {message}')
def assertRows(self, args, rows):
reader = self.get_output_as_reader(args)
for row in rows:
self.assertEqual(next(reader), row)
self.assertRaises(StopIteration, next, reader)
def assertLines(self, args, rows, newline_at_eof=True):
lines = self.get_output_as_list(args)
if newline_at_eof:
rows.append('')
for i, row in enumerate(rows):
self.assertEqual(lines[i], row)
self.assertEqual(len(lines), len(rows))
class EmptyFileTests:
def test_empty(self):
with open('examples/empty.csv', 'rb') as f, stdin_as_string(f):
utility = self.Utility(getattr(self, 'default_args', []))
utility.run()
class NamesTests:
def test_names(self):
output = self.get_output_as_io(['-n', 'examples/dummy.csv'])
self.assertEqual(next(output), ' 1: a\n')
self.assertEqual(next(output), ' 2: b\n')
self.assertEqual(next(output), ' 3: c\n')
def test_invalid_options(self):
args = ['-n', '--no-header-row', 'examples/dummy.csv']
output_file = io.StringIO()
utility = self.Utility(args, output_file)
with self.assertRaises(RequiredHeaderError):
utility.run()
output_file.close()
class ColumnsTests:
def test_invalid_column(self):
args = getattr(self, 'columns_args', []) + ['-c', '0', 'examples/dummy.csv']
output_file = io.StringIO()
utility = self.Utility(args, output_file)
with self.assertRaises(ColumnIdentifierError):
utility.run()
output_file.close()
|