1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
|
from __future__ import absolute_import, division, print_function
from operator import getitem
from tornado import gen
from dask.utils import apply
from distributed.client import default_client
from .core import Stream
from . import core, sources
class DaskStream(Stream):
""" A Parallel stream using Dask
This object is fully compliant with the ``streamz.core.Stream`` object but
uses a Dask client for execution. Operations like ``map`` and
``accumulate`` submit functions to run on the Dask instance using
``dask.distributed.Client.submit`` and pass around Dask futures.
Time-based operations like ``timed_window``, buffer, and so on operate as
normal.
Typically one transfers between normal Stream and DaskStream objects using
the ``Stream.scatter()`` and ``DaskStream.gather()`` methods.
Examples
--------
>>> from dask.distributed import Client
>>> client = Client()
>>> from streamz import Stream
>>> source = Stream()
>>> source.scatter().map(func).accumulate(binop).gather().sink(...)
See Also
--------
dask.distributed.Client
"""
def __init__(self, *args, **kwargs):
kwargs["ensure_io_loop"] = True
super().__init__(*args, **kwargs)
@DaskStream.register_api()
class map(DaskStream):
def __init__(self, upstream, func, *args, **kwargs):
self.func = func
self.kwargs = kwargs
self.args = args
DaskStream.__init__(self, upstream)
def update(self, x, who=None, metadata=None):
client = default_client()
result = client.submit(self.func, x, *self.args, **self.kwargs)
return self._emit(result, metadata=metadata)
@DaskStream.register_api()
class accumulate(DaskStream):
def __init__(self, upstream, func, start=core.no_default,
returns_state=False, **kwargs):
self.func = func
self.state = start
self.returns_state = returns_state
self.kwargs = kwargs
self.with_state = kwargs.pop('with_state', False)
DaskStream.__init__(self, upstream)
def update(self, x, who=None, metadata=None):
if self.state is core.no_default:
self.state = x
if self.with_state:
return self._emit((self.state, x), metadata=metadata)
else:
return self._emit(x, metadata=metadata)
else:
client = default_client()
result = client.submit(self.func, self.state, x, **self.kwargs)
if self.returns_state:
state = client.submit(getitem, result, 0)
result = client.submit(getitem, result, 1)
else:
state = result
self.state = state
if self.with_state:
return self._emit((self.state, result), metadata=metadata)
else:
return self._emit(result, metadata=metadata)
@core.Stream.register_api()
@DaskStream.register_api()
class scatter(DaskStream):
""" Convert local stream to Dask Stream
All elements flowing through the input will be scattered out to the cluster
"""
@gen.coroutine
def update(self, x, who=None, metadata=None):
client = default_client()
self._retain_refs(metadata)
# We need to make sure that x is treated as it is by dask
# However, client.scatter works internally different for
# lists and dicts. So we always use a list here to be sure
# we know the format exactly. We do not use a key to avoid
# issues like https://github.com/python-streamz/streams/issues/397.
future_as_list = yield client.scatter([x], asynchronous=True, hash=False)
future = future_as_list[0]
f = yield self._emit(future, metadata=metadata)
self._release_refs(metadata)
raise gen.Return(f)
@DaskStream.register_api()
class gather(core.Stream):
""" Wait on and gather results from DaskStream to local Stream
This waits on every result in the stream and then gathers that result back
to the local stream. Warning, this can restrict parallelism. It is common
to combine a ``gather()`` node with a ``buffer()`` to allow unfinished
futures to pile up.
Examples
--------
>>> local_stream = dask_stream.buffer(20).gather()
See Also
--------
buffer
scatter
"""
@gen.coroutine
def update(self, x, who=None, metadata=None):
client = default_client()
self._retain_refs(metadata)
result = yield client.gather(x, asynchronous=True)
result2 = yield self._emit(result, metadata=metadata)
self._release_refs(metadata)
raise gen.Return(result2)
@DaskStream.register_api()
class starmap(DaskStream):
def __init__(self, upstream, func, **kwargs):
self.func = func
stream_name = kwargs.pop('stream_name', None)
self.kwargs = kwargs
DaskStream.__init__(self, upstream, stream_name=stream_name)
def update(self, x, who=None, metadata=None):
client = default_client()
result = client.submit(apply, self.func, x, self.kwargs)
return self._emit(result, metadata=metadata)
@DaskStream.register_api()
class buffer(DaskStream, core.buffer):
pass
@DaskStream.register_api()
class combine_latest(DaskStream, core.combine_latest):
pass
@DaskStream.register_api()
class delay(DaskStream, core.delay):
pass
@DaskStream.register_api()
class latest(DaskStream, core.latest):
pass
@DaskStream.register_api()
class partition(DaskStream, core.partition):
pass
@DaskStream.register_api()
class rate_limit(DaskStream, core.rate_limit):
pass
@DaskStream.register_api()
class sliding_window(DaskStream, core.sliding_window):
pass
@DaskStream.register_api()
class timed_window(DaskStream, core.timed_window):
pass
@DaskStream.register_api()
class union(DaskStream, core.union):
pass
@DaskStream.register_api()
class zip(DaskStream, core.zip):
pass
@DaskStream.register_api(staticmethod)
class filenames(DaskStream, sources.filenames):
pass
@DaskStream.register_api(staticmethod)
class from_textfile(DaskStream, sources.from_textfile):
pass
|