1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
import os
import numpy as np
from pandas import DataFrame
import pytest
from fastparquet import ParquetFile
from fastparquet import write
from .util import tempdir
def _generate_random_dataframe(n_rows=1000):
# borrowing approach from test/test_output.py, test_roundtrip_s3 function
data_dict = dict(
i32=np.arange(n_rows, dtype=np.int32),
i64=np.arange(n_rows, dtype=np.int64),
f32=np.arange(n_rows, dtype=np.float32),
f64=np.arange(n_rows, dtype=np.float64),
obj=np.random.choice(["some", "random", "words"],
size=n_rows).astype("O")
)
data = DataFrame(data_dict)
return data
def _convert_to_parquet(dfs, tempdir, path_prefix):
parquet_files = {}
for name, df in dfs.items():
path_base = "{}_{}.parquet".format(path_prefix, name)
path = os.path.join(tempdir, path_base)
write(path, df)
parquet_files[name] = ParquetFile(path)
return parquet_files
def test_schema_eq(tempdir):
dfs = {key: _generate_random_dataframe() for key in ("A", "B")}
parquet_files = _convert_to_parquet(dfs, tempdir, "test_scheme_eq")
assert parquet_files["A"].schema == parquet_files["B"].schema
def test_schema_ne_subset(tempdir):
# schemas don't match, one is subset of other
dfs = {key: _generate_random_dataframe() for key in ("A", "B")}
dfs["B"].drop("i32", axis="columns", inplace=True)
parquet_files = _convert_to_parquet(dfs, tempdir, "test_scheme_ne_subset")
assert parquet_files["A"].schema != parquet_files["B"].schema
def test_schema_ne_renamed(tempdir):
# schemas don't match, at least one name doesn't match (dtypes match)
dfs = {key: _generate_random_dataframe() for key in ("A", "B")}
dfs["B"].rename({"i32": "new_name"}, axis="columns", inplace=True)
parquet_files = _convert_to_parquet(dfs, tempdir, "test_scheme_ne_renamed")
assert parquet_files["A"].schema != parquet_files["B"].schema
def test_schema_ne_converted(tempdir):
# schemas don't match, at least one dtype doesn't match (names match)
dfs = {key: _generate_random_dataframe() for key in ("A", "B")}
dfs["B"]["i32"] = dfs["B"]["i32"].astype(np.int64)
parquet_files = _convert_to_parquet(dfs, tempdir, "test_scheme_ne_convert")
assert parquet_files["A"].schema != parquet_files["B"].schema
def test_schema_ne_different_order(tempdir):
# schemas don't match, column order is different
dfs = {key: _generate_random_dataframe() for key in ("A", "B")}
dfs["B"] = dfs["B"][dfs["B"].columns[::-1]]
parquet_files = _convert_to_parquet(dfs, tempdir, "test_scheme_ne_order")
assert parquet_files["A"].schema != parquet_files["B"].schema
|