From: Martin Durant <martin.durant@alumni.utoronto.ca>
Date: Mon, 12 Sep 2022 16:31:11 -0400
Subject: [PATCH] CI fixes

---
 streamz/tests/py3_test_core.py | 10 +++++-----
 streamz/tests/test_core.py     | 28 +++-------------------------
 streamz/tests/test_dask.py     | 23 ++++++++++++-----------
 3 files changed, 20 insertions(+), 41 deletions(-)

diff --git a/streamz/tests/py3_test_core.py b/streamz/tests/py3_test_core.py
index 5ea1238..90412e4 100644
--- a/streamz/tests/py3_test_core.py
+++ b/streamz/tests/py3_test_core.py
@@ -1,16 +1,16 @@
 # flake8: noqa
+import asyncio
 from time import time
-from distributed.utils_test import loop, inc  # noqa
-from tornado import gen
+from distributed.utils_test import inc  # noqa
 
 from streamz import Stream
 
 
-def test_await_syntax(loop):  # noqa
+def test_await_syntax():  # noqa
     L = []
 
     async def write(x):
-        await gen.sleep(0.1)
+        await asyncio.sleep(0.1)
         L.append(x)
 
     async def f():
@@ -25,4 +25,4 @@ def test_await_syntax(loop):  # noqa
         assert 0.2 < stop - start < 0.4
         assert 2 <= len(L) <= 4
 
-    loop.run_sync(f)
+    asyncio.run(f())
diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py
index 56a661d..02842a6 100644
--- a/streamz/tests/test_core.py
+++ b/streamz/tests/test_core.py
@@ -1,3 +1,4 @@
+import asyncio
 from datetime import timedelta
 from functools import partial
 import itertools
@@ -12,6 +13,7 @@ import pytest
 
 from tornado.queues import Queue
 from tornado.ioloop import IOLoop
+from tornado import gen
 
 import streamz as sz
 
@@ -19,7 +21,7 @@ from streamz import RefCounter
 from streamz.sources import sink_to_file
 from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger,   # noqa: F401
         clean, await_for, metadata, wait_for)  # noqa: F401
-from distributed.utils_test import loop   # noqa: F401
+from distributed.utils_test import loop, loop_in_thread, cleanup   # noqa: F401
 
 
 def test_basic():
@@ -1485,20 +1487,6 @@ def dont_test_stream_kwargs(clean):  # noqa: F811
     sin.emit(1)
 
 
-@pytest.fixture
-def thread(loop):  # noqa: F811
-    from threading import Thread, Event
-    thread = Thread(target=loop.start)
-    thread.daemon = True
-    thread.start()
-
-    event = Event()
-    loop.add_callback(event.set)
-    event.wait()
-
-    return thread
-
-
 def test_percolate_loop_information(clean):  # noqa: F811
     source = Stream()
     assert not source.loop
@@ -1506,16 +1494,6 @@ def test_percolate_loop_information(clean):  # noqa: F811
     assert source.loop is s.loop
 
 
-def test_separate_thread_without_time(loop, thread):  # noqa: F811
-    assert thread.is_alive()
-    source = Stream(loop=loop)
-    L = source.map(inc).sink_to_list()
-
-    for i in range(10):
-        source.emit(i)
-        assert L[-1] == i + 1
-
-
 def test_separate_thread_with_time(clean):  # noqa: F811
     L = []
 
diff --git a/streamz/tests/test_dask.py b/streamz/tests/test_dask.py
index e99b172..c0e2528 100644
--- a/streamz/tests/test_dask.py
+++ b/streamz/tests/test_dask.py
@@ -72,10 +72,10 @@ async def test_partition_then_scatter_async(c, s, a, b):
     assert L == [1, 2, 3]
 
 
-def test_partition_then_scatter_sync(loop):
+def test_partition_then_scatter_sync():
     # Ensure partition w/ timeout before scatter works correctly for synchronous
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop) as client:  # noqa: F841
+        with Client(s['address']) as client:  # noqa: F841
             start = time.monotonic()
             source = Stream()
             L = source.partition(2, timeout=.1).scatter().map(
@@ -164,9 +164,9 @@ async def test_accumulate(c, s, a, b):
     assert L[-1][1] == 3
 
 
-def test_sync(loop):  # noqa: F811
+def test_sync():  # noqa: F811
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop) as client:  # noqa: F841
+        with Client(s['address']) as client:  # noqa: F841
             source = Stream()
             L = source.scatter().map(inc).gather().sink_to_list()
 
@@ -174,14 +174,14 @@ def test_sync(loop):  # noqa: F811
                 for i in range(10):
                     await source.emit(i, asynchronous=True)
 
-            sync(loop, f)
+            sync(client.loop, f)
 
             assert L == list(map(inc, range(10)))
 
 
-def test_sync_2(loop):  # noqa: F811
+def test_sync_2():  # noqa: F811
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop):  # noqa: F841
+        with Client(s['address']):  # noqa: F841
             source = Stream()
             L = source.scatter().map(inc).gather().sink_to_list()
 
@@ -218,9 +218,9 @@ async def test_buffer(c, s, a, b):
     assert source.loop == c.loop
 
 
-def test_buffer_sync(loop):  # noqa: F811
+def test_buffer_sync():  # noqa: F811
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop) as c:  # noqa: F841
+        with Client(s['address']) as c:  # noqa: F841
             source = Stream()
             buff = source.scatter().map(slowinc, delay=0.5).buffer(5)
             L = buff.gather().sink_to_list()
@@ -241,10 +241,11 @@ def test_buffer_sync(loop):  # noqa: F811
             assert L == list(map(inc, range(10)))
 
 
+@pytest.mark.asyncio
 @pytest.mark.xfail(reason='')
-async def test_stream_shares_client_loop(loop):  # noqa: F811
+async def test_stream_shares_client_loop():  # noqa: F811
     with cluster() as (s, [a, b]):
-        with Client(s['address'], loop=loop) as client:  # noqa: F841
+        with Client(s['address']) as client:  # noqa: F841
             source = Stream()
             d = source.timed_window('20ms').scatter()  # noqa: F841
             assert source.loop is client.loop
