1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
import csv
import os
from tempfile import NamedTemporaryFile
import aiofiles
import pytest
from aiocsv import AsyncDictReader, AsyncDictWriter
from aiocsv.protocols import CsvDialectKwargs
FILENAME = "tests/metro_systems.tsv"
DIALECT_PARAMS: CsvDialectKwargs = {"delimiter": "\t", "quotechar": "'", "quoting": csv.QUOTE_ALL}
HEADER = ["City", "Stations", "System Length"]
VALUES = [
dict(zip(HEADER, i))
for i in [
["New York", "424", "380"],
["Shanghai", "345", "676"],
["Seoul", "331", "353"],
["Beijing", "326", "690"],
["Paris", "302", "214"],
["London", "270", "402"],
]
]
@pytest.mark.asyncio
async def test_dict_read():
async with aiofiles.open(FILENAME, mode="r", encoding="ascii", newline="") as afp:
read_rows = [i async for i in AsyncDictReader(afp, **DIALECT_PARAMS)]
assert read_rows == VALUES
@pytest.mark.asyncio
async def test_dict_read_line_nums():
async with aiofiles.open(FILENAME, mode="r", encoding="ascii", newline="") as afp:
r = AsyncDictReader(afp, **DIALECT_PARAMS)
read_rows = [(row, r.line_num) async for row in r]
assert read_rows == [(row, i) for i, row in enumerate(VALUES, start=2)]
@pytest.mark.asyncio
async def test_dict_read_get_fieldnames():
async with aiofiles.open(FILENAME, mode="r", encoding="ascii", newline="") as afp:
reader = AsyncDictReader(afp, **DIALECT_PARAMS)
assert reader.fieldnames is None
assert await reader.get_fieldnames() == ["City", "Stations", "System Length"]
assert reader.fieldnames == ["City", "Stations", "System Length"]
@pytest.mark.asyncio
async def test_dict_write():
# Create a TempFile to direct writer to
with NamedTemporaryFile(mode="w+", suffix=".csv", delete=False) as tf:
target_name = tf.name
try:
# Write rows
async with aiofiles.open(target_name, mode="w", encoding="ascii", newline="") as afp:
writer = AsyncDictWriter(afp, HEADER, **DIALECT_PARAMS)
await writer.writeheader()
await writer.writerow(VALUES[0])
await writer.writerows(VALUES[1:])
# Read original and created files
with open(target_name, mode="r", encoding="ascii") as created_f:
created = created_f.read()
with open(FILENAME, mode="r", encoding="ascii") as original_f:
original = original_f.read()
# Check if content matches
assert created == original
finally:
os.remove(target_name)
|