1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
|
.. get_started
Get Started
============
An :class:`Observable <reactivex.Observable>` is the core type in ReactiveX. It
serially pushes items, known as *emissions*, through a series of operators until
it finally arrives at an Observer, where they are
consumed.
Push-based (rather than pull-based) iteration opens up powerful new
possibilities to express code and concurrency much more quickly. Because an
:class:`Observable <reactivex.Observable>` treats events as data and data as events,
composing the two together becomes trivial.
There are many ways to create an :class:`Observable <reactivex.Observable>` that hands
items to an Observer. You can use a :func:`create()
<reactivex.create>` factory and pass it functions that handle items:
* The *on_next* function is called each time the Observable emits an item.
* The *on_completed* function is called when the Observable completes.
* The *on_error* function is called when an error occurs on the Observable.
You do not have to specify all three event types. You can pick and choose which
events you want to observe by providing only some of the callbacks, or simply by
providing a single lambda for *on_next*. Typically in production, you will want
to provide an *on_error* handler so that errors are explicitly handled by the
subscriber.
Let's consider the following example:
.. code:: python
from reactivex import create
def push_five_strings(observer, scheduler):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()
source = create(push_five_strings)
source.subscribe(
on_next = lambda i: print("Received {0}".format(i)),
on_error = lambda e: print("Error Occurred: {0}".format(e)),
on_completed = lambda: print("Done!"),
)
An Observable is created with create. On subscription, the *push_five_strings*
function is called. This function emits five items. The three callbacks provided
to the *subscribe* function simply print the received items and completion
states. Note that the use of lambdas simplify the code in this basic example.
Output:
.. code:: console
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
However, there are many :ref:`Observable factories
<reference_observable_factory>` for common sources of emissions. To simply push
five items, we can rid the :func:`create() <reactivex.create>` and its backing
function, and use :func:`of() <reactivex.of>`. This factory accepts an argument list,
iterates on each argument to emit them as items, and the completes. Therefore,
we can simply pass these five Strings as arguments to it:
.. code:: python
from reactivex import of
source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
source.subscribe(
on_next = lambda i: print("Received {0}".format(i)),
on_error = lambda e: print("Error Occurred: {0}".format(e)),
on_completed = lambda: print("Done!"),
)
And a single parameter can be provided to the subscribe function if completion
and error are ignored:
.. code:: python
from reactivex import of
source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
source.subscribe(lambda value: print("Received {0}".format(value)))
Output:
.. code:: console
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Operators and Chaining
--------------------------
You can also derive new Observables using over 130 operators available in RxPY.
Each operator will yield a new :class:`Observable <reactivex.Observable>` that
transforms emissions from the source in some way. For example, we can
:func:`map() <reactivex.operators.map>` each `String` to its length, then
:func:`filter() <reactivex.operators.filter>` for lengths being at least 5. These will
yield two separate Observables built off each other.
.. code:: python
from reactivex import of, operators as op
source = of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
composed = source.pipe(
op.map(lambda s: len(s)),
op.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print("Received {0}".format(value)))
Output:
.. code:: console
Received 5
Received 5
Received 5
Received 7
Typically, you do not want to save Observables into intermediary variables for
each operator, unless you want to have multiple subscribers at that point.
Instead, you want to strive to inline and create an "Observable pipeline" of
operations. That way your code is readable and tells a story much more easily.
.. code:: python
from reactivex import of, operators as op
of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
op.map(lambda s: len(s)),
op.filter(lambda i: i >= 5)
).subscribe(lambda value: print("Received {0}".format(value)))
Custom operator
---------------
As operators chains grow up, the chains must be split to make the code more
readable. New operators are implemented as functions, and can be directly used
in the *pipe* operator. When an operator is implemented as a composition of
other operators, then the implementation is straightforward, thanks to the
*pipe* function:
.. code:: python
import reactivex
from reactivex import operators as ops
def length_more_than_5():
return rx.pipe(
ops.map(lambda s: len(s)),
ops.filter(lambda i: i >= 5),
)
reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
length_more_than_5()
).subscribe(lambda value: print("Received {0}".format(value)))
In this example, the *map* and *filter* operators are grouped in a new
*length_more_than_5* operator.
It is also possible to create an operator that is not a composition of other
operators. This allows to fully control the subscription logic and items
emissions:
.. code:: python
import reactivex
def lowercase():
def _lowercase(source):
def subscribe(observer, scheduler = None):
def on_next(value):
observer.on_next(value.lower())
return source.subscribe(
on_next,
observer.on_error,
observer.on_completed,
scheduler)
return reactivex.create(subscribe)
return _lowercase
reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
lowercase()
).subscribe(lambda value: print("Received {0}".format(value)))
In this example, the *lowercase* operator converts all received items to
lowercase. The structure of the *_lowercase* function is a very common way to
implement custom operators: It takes a source Observable as input, and returns a
custom Observable. The source observable is subscribed only when the output
Observable is subscribed. This allows to chain subscription calls when building
a pipeline.
Output:
.. code:: console
Received alpha
Received beta
Received gamma
Received delta
Received epsilon
Concurrency
-----------
CPU Concurrency
................
To achieve concurrency, you use two operators: :func:`subscribe_on()
<reactivex.operators.subscribe_on>` and :func:`observe_on() <reactivex.operators.observe_on>`.
Both need a :ref:`Scheduler <reference_scheduler>` which provides a thread for
each subscription to do work (see section on Schedulers below). The
:class:`ThreadPoolScheduler <reactivex.scheduler.ThreadPoolScheduler>` is a good
choice to create a pool of reusable worker threads.
.. attention::
`GIL <https://wiki.python.org/moin/GlobalInterpreterLock>`_ has the potential to
undermine your concurrency performance, as it prevents multiple threads from
accessing the same line of code simultaneously. Libraries like
`NumPy <http://www.numpy.org/>`_ can mitigate this for parallel intensive
computations as they free the GIL. RxPy may also minimize thread overlap to some
degree. Just be sure to test your application with concurrency and ensure there
is a performance gain.
The :func:`subscribe_on() <reactivex.operators.subscribe_on>` instructs the source
:class:`Observable <reactivex.Observable>` at the start of the chain which scheduler to
use (and it does not matter where you put this operator). The
:func:`observe_on() <reactivex.operators.observe_on>`, however, will switch to a
different *Scheduler* **at that point** in the *Observable* chain, effectively
moving an emission from one thread to another. Some :ref:`Observable factories
<reference_observable_factory>` and :ref:`operators
<reference_operators>`, like :func:`interval() <reactivex.interval>` and
:func:`delay() <reactivex.operators.delay>`, already have a default *Scheduler* and
thus will ignore any :func:`subscribe_on() <reactivex.operators.subscribe_on>` you
specify (although you can pass a *Scheduler* usually as an argument).
Below, we run three different processes concurrently rather than sequentially
using :func:`subscribe_on() <reactivex.operators.subscribe_on>` as well as an
:func:`observe_on() <reactivex.operators.observe_on>`.
.. code:: python
import multiprocessing
import random
import time
from threading import current_thread
import reactivex
from reactivex.scheduler import ThreadPoolScheduler
from reactivex import operators as ops
def intense_calculation(value):
# sleep for a random short duration between 0.5 to 2.0 seconds to simulate a long-running calculation
time.sleep(random.randint(5, 20) * 0.1)
return value
# calculate number of CPUs, then create a ThreadPoolScheduler with that number of threads
optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
# Create Process 1
reactivex.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
ops.map(lambda s: intense_calculation(s)), ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda s: print("PROCESS 1: {0} {1}".format(current_thread().name, s)),
on_error=lambda e: print(e),
on_completed=lambda: print("PROCESS 1 done!"),
)
# Create Process 2
reactivex.range(1, 10).pipe(
ops.map(lambda s: intense_calculation(s)), ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda i: print("PROCESS 2: {0} {1}".format(current_thread().name, i)),
on_error=lambda e: print(e),
on_completed=lambda: print("PROCESS 2 done!"),
)
# Create Process 3, which is infinite
reactivex.interval(1).pipe(
ops.map(lambda i: i * 100),
ops.observe_on(pool_scheduler),
ops.map(lambda s: intense_calculation(s)),
).subscribe(
on_next=lambda i: print("PROCESS 3: {0} {1}".format(current_thread().name, i)),
on_error=lambda e: print(e),
)
input("Press Enter key to exit\n")
**OUTPUT:**
.. code:: console
Press Enter key to exit
PROCESS 1: Thread-1 Alpha
PROCESS 2: Thread-2 1
PROCESS 3: Thread-4 0
PROCESS 2: Thread-2 2
PROCESS 1: Thread-1 Beta
PROCESS 3: Thread-7 100
PROCESS 3: Thread-7 200
PROCESS 2: Thread-2 3
PROCESS 1: Thread-1 Gamma
PROCESS 1: Thread-1 Delta
PROCESS 2: Thread-2 4
PROCESS 3: Thread-7 300
IO Concurrency
................
IO concurrency is also supported for several asynchronous frameworks, in
combination with associated RxPY schedulers. The following example implements
a simple echo TCP server that delays its answers by 5 seconds. It uses AsyncIO
as an event loop.
The TCP server is implemented in AsyncIO, and the echo logic is implemented as
an RxPY operator chain. Futures allow the operator chain to drive the loop of
the coroutine.
.. code:: python
from collections import namedtuple
import asyncio
import reactivex
import reactivex.operators as ops
from reactivex.subject import Subject
from reactivex.scheduler.eventloop import AsyncIOScheduler
EchoItem = namedtuple('EchoItem', ['future', 'data'])
def tcp_server(sink, loop):
def on_subscribe(observer, scheduler):
async def handle_echo(reader, writer):
print("new client connected")
while True:
data = await reader.readline()
data = data.decode("utf-8")
if not data:
break
future = asyncio.Future()
observer.on_next(EchoItem(
future=future,
data=data
))
await future
writer.write(future.result().encode("utf-8"))
print("Close the client socket")
writer.close()
def on_next(i):
i.future.set_result(i.data)
print("starting server")
server = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
loop.create_task(server)
sink.subscribe(
on_next=on_next,
on_error=observer.on_error,
on_completed=observer.on_completed)
return reactivex.create(on_subscribe)
loop = asyncio.get_event_loop()
proxy = Subject()
source = tcp_server(proxy, loop)
aio_scheduler = AsyncIOScheduler(loop=loop)
source.pipe(
ops.map(lambda i: i._replace(data="echo: {}".format(i.data))),
ops.delay(5.0)
).subscribe(proxy, scheduler=aio_scheduler)
loop.run_forever()
print("done")
loop.close()
Execute this code from a shell, and connect to it via telnet. Then each line
that you type is echoed 5 seconds later.
.. code:: console
telnet localhost 8888
Connected to localhost.
Escape character is '^]'.
foo
echo: foo
If you connect simultaneously from several clients, you can see that requests
are correctly served, multiplexed on the AsyncIO event loop.
Default Scheduler
..................
There are several ways to choose a scheduler. The first one is to provide it
explicitly to each operator that supports a scheduler. However this can be
annoying when a lot of operators are used. So there is a second way to indicate
what scheduler will be used as the default scheduler for the whole chain: The
scheduler provided in the subscribe call is the default scheduler for all
operators in a pipe.
.. code:: python
source.pipe(
...
).subscribe(proxy, scheduler=my_default_scheduler)
Operators that accept a scheduler select the scheduler to use in the following way:
- If a scheduler is provided for the operator, then use it.
- If a default scheduler is provided in subscribe, then use it.
- Otherwise use the default scheduler of the operator.
|