1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
from __future__ import annotations
import asyncio
import time
import pytest
from distributed.diagnostics import MemorySampler
from distributed.utils_test import gen_cluster
@gen_cluster(client=True)
async def test_async(c, s, a, b):
ms = MemorySampler()
async with ms.sample("foo", measure="managed", interval=0.1):
f = c.submit(lambda: 1)
await f
await asyncio.sleep(0.5)
assert ms.samples["foo"][0][1] == 0
assert ms.samples["foo"][-1][1] > 0
# Test that there is no server-side memory leak
assert not s.extensions["memory_sampler"].samples
def test_sync(client):
ms = MemorySampler()
with ms.sample("foo", measure="managed", interval=0.1):
f = client.submit(lambda: 1)
f.result()
time.sleep(0.5)
assert ms.samples["foo"][0][1] == 0
assert ms.samples["foo"][-1][1] > 0
@gen_cluster(client=True) # MemorySampler internally fetches the client
async def test_at_least_one_sample(c, s, a, b):
"""The first sample is taken immediately
Also test omitting the label
"""
ms = MemorySampler()
async with ms.sample():
pass
assert len(next(iter(ms.samples.values()))) == 1
@pytest.mark.slow
@gen_cluster(client=True)
async def test_multi_sample(c, s, a, b):
ms = MemorySampler()
s1 = ms.sample("managed", measure="managed", interval=0.15)
s2 = ms.sample("process", interval=0.2)
async with s1, s2:
idle_mem = s.memory.process
f = c.submit(lambda: "x" * 100 * 2**20) # 100 MiB
await f
while s.memory.process < idle_mem + 80 * 2**20:
# Wait for heartbeat
await asyncio.sleep(0.01)
await asyncio.sleep(0.6)
m = ms.samples["managed"]
p = ms.samples["process"]
assert len(m) >= 2
assert m[0][1] == 0
assert m[-1][1] >= 100 * 2**20
assert len(p) >= 2
assert p[0][1] > 2**20 # Assume > 1 MiB for idle process
assert p[-1][1] > p[0][1] + 80 * 2**20
assert m[-1][1] < p[-1][1]
@gen_cluster(client=True)
@pytest.mark.parametrize("align", [False, True])
async def test_pandas(c, s, a, b, align):
pd = pytest.importorskip("pandas")
pytest.importorskip("matplotlib")
ms = MemorySampler()
async with ms.sample("foo", measure="managed", interval=0.15):
f = c.submit(lambda: 1)
await f
await asyncio.sleep(0.7)
assert ms.samples["foo"][0][1] == 0
assert ms.samples["foo"][-1][1] > 0
df = ms.to_pandas(align=align)
assert isinstance(df, pd.DataFrame)
if align:
assert isinstance(df.index, pd.TimedeltaIndex)
assert df["foo"].iloc[0] == 0
assert df["foo"].iloc[-1] > 0
assert df.index[0] == pd.Timedelta(0, unit="s")
assert pd.Timedelta(0, unit="s") < df.index[1]
assert df.index[1] < pd.Timedelta(1.5, unit="s")
else:
assert isinstance(df.index, pd.DatetimeIndex)
assert pd.Timedelta(0, unit="s") < df.index[1] - df.index[0]
assert df.index[1] - df.index[0] < pd.Timedelta(1.5, unit="s")
plt = ms.plot(align=align, grid=True)
assert plt
@pytest.mark.slow
@gen_cluster(client=True)
@pytest.mark.parametrize("align", [False, True])
async def test_pandas_multiseries(c, s, a, b, align):
"""Test that multiple series are upsampled and aligned to each other"""
pd = pytest.importorskip("pandas")
ms = MemorySampler()
for (label, interval, final_sleep) in (("foo", 0.15, 1.0), ("bar", 0.2, 0.6)):
async with ms.sample(label, measure="managed", interval=interval):
x = c.submit(lambda: 1, key="x")
await x
await asyncio.sleep(final_sleep)
del x
while "x" in s.tasks:
await asyncio.sleep(0.01)
for label in ("foo", "bar"):
assert ms.samples[label][0][1] == 0
assert ms.samples[label][-1][1] > 0
df = ms.to_pandas(align=align)
if align:
assert df.index[0] == pd.Timedelta(0, unit="s")
# ffill does not go beyond the end of the series
assert df["foo"].iloc[0] == 0
assert df["foo"].iloc[-1] > 0
assert df["bar"].iloc[0] == 0
assert pd.isna(df["bar"].iloc[-1])
assert df["bar"].ffill().iloc[-1] > 0
else:
assert df["foo"].iloc[0] == 0
assert pd.isna(df["foo"].iloc[-1])
assert pd.isna(df["bar"].iloc[0])
assert df["bar"].iloc[-1] > 0
|