File: test_arrow.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (48 lines) | stat: -rw-r--r-- 1,479 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from __future__ import annotations

import pytest

pa = pytest.importorskip("pyarrow")
pd = pytest.importorskip("pandas")

import distributed
from distributed.protocol import deserialize, serialize, to_serialize
from distributed.utils_test import gen_cluster

df = pd.DataFrame({"A": list("abc"), "B": [1, 2, 3]})
tbl = pa.Table.from_pandas(df, preserve_index=False)
batch = pa.RecordBatch.from_pandas(df, preserve_index=False)


@pytest.mark.parametrize("obj", [batch, tbl], ids=["RecordBatch", "Table"])
def test_roundtrip(obj):
    # Test that the serialize/deserialize functions actually
    # work independent of distributed
    header, frames = serialize(obj)
    new_obj = deserialize(header, frames)
    assert obj.equals(new_obj)


def echo(arg):
    return arg


@pytest.mark.parametrize("obj", [batch, tbl], ids=["RecordBatch", "Table"])
def test_scatter(obj):
    @gen_cluster(client=True)
    async def run_test(client, scheduler, worker1, worker2):
        obj_fut = await client.scatter(obj)
        fut = client.submit(echo, obj_fut)
        result = await fut
        assert obj.equals(result)

    run_test()


def test_dumps_compression():
    # https://github.com/dask/distributed/issues/2966
    # large enough to trigger compression
    t = pa.Table.from_pandas(pd.DataFrame({"A": [1] * 10000}))
    msg = {"op": "update", "data": to_serialize(t)}
    result = distributed.protocol.loads(distributed.protocol.dumps(msg))
    assert result["data"].equals(t)