1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
from __future__ import annotations
import pytest
pd = pytest.importorskip("pandas")
np = pytest.importorskip("numpy")
from dask.dataframe.utils import assert_eq
from dask.utils import ensure_bytes
from distributed.protocol import (
decompress,
deserialize,
dumps,
loads,
serialize,
to_serialize,
)
dfs = [
pd.DataFrame({}),
pd.DataFrame({"x": [1, 2, 3]}),
pd.DataFrame({"x": [1.0, 2.0, 3.0]}),
pd.DataFrame({0: [1, 2, 3]}),
pd.DataFrame({"x": [1.0, 2.0, 3.0], "y": [4.0, 5.0, 6.0]}),
pd.DataFrame({"x": [1.0, 2.0, 3.0]}, index=pd.Index([4, 5, 6], name="bar")),
pd.Series([1.0, 2.0, 3.0]),
pd.Series([1.0, 2.0, 3.0], name="foo"),
pd.Series([1.0, 2.0, 3.0], name="foo", index=[4, 5, 6]),
pd.Series([1.0, 2.0, 3.0], name="foo", index=pd.Index([4, 5, 6], name="bar")),
pd.DataFrame({"x": ["a", "b", "c"]}),
pd.DataFrame({"x": [b"a", b"b", b"c"]}),
pd.DataFrame({"x": pd.Categorical(["a", "b", "a"], ordered=True)}),
pd.DataFrame({"x": pd.Categorical(["a", "b", "a"], ordered=False)}),
pd.Index(pd.Categorical(["a"], categories=["a", "b"], ordered=True)),
pd.date_range("2000", periods=12, freq="B"),
pd.RangeIndex(10),
pd.DataFrame(
"a",
index=pd.Index(["a", "b", "c", "d"], name="a"),
columns=pd.Index(["A", "B", "C", "D"], name="columns"),
),
pd.DataFrame(
np.random.randn(10, 5), columns=list("ABCDE"), index=list("abcdefghij")
),
pd.DataFrame(
np.random.randn(10, 5), columns=list("ABCDE"), index=list("abcdefghij")
).where(lambda x: x > 0),
pd.DataFrame(
{
"a": [0.0, 0.1],
"B": [0.0, 1.0],
"C": ["a", "b"],
"D": pd.to_datetime(["2000", "2001"]),
}
),
pd.Series(["a", "b", "c"], index=["a", "b", "c"]),
pd.DataFrame(
np.random.randn(10, 5),
columns=list("ABCDE"),
index=pd.period_range("2000", periods=10, freq="B"),
),
pd.DataFrame(
np.random.randn(10, 5),
columns=list("ABCDE"),
index=pd.date_range("2000", periods=10, freq="B"),
),
pd.Series(
np.random.randn(10), name="a", index=pd.date_range("2000", periods=10, freq="B")
),
pd.Index(["סשםקה7ךשץא", "8טלכז6לרפל"]),
]
@pytest.mark.parametrize("df", dfs)
def test_dumps_serialize_pandas(df):
header, frames = serialize(df)
if "compression" in header:
frames = decompress(header, frames)
df2 = deserialize(header, frames)
assert_eq(df, df2)
def test_dumps_pandas_writable():
a1 = np.arange(1000)
s1 = pd.Series(a1)
fs = dumps([to_serialize(s1)])
# Make all frames read-only
fs = list(map(ensure_bytes, fs))
(s2,) = loads(fs)
assert (s1 == s2).all()
s2[...] = 0
|