1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
|
Consumers
=========
Channels is built around a basic low-level spec called :doc:`ASGI </asgi>`.
ASGI is more designed for interoperability than for writing complex
applications in. So, Channels provides you with Consumers, a rich abstraction
that allows you to create ASGI applications easily.
Consumers do a couple of things in particular:
* Structure your code as a series of functions to be called whenever an
event happens, rather than making you write an event loop.
* Allow you to write synchronous or async code, and deal with handoffs
and threading for you.
Of course, you are free to ignore consumers and use the other parts of
Channels - like routing, session handling and authentication - with any
ASGI app, but they're generally the best way to write your application code.
.. _sync_to_async:
Basic Layout
------------
A consumer is a subclass of either ``channels.consumer.AsyncConsumer`` or
``channels.consumer.SyncConsumer``. As these names suggest, one will expect
you to write async-capable code, while the other will run your code
synchronously in a threadpool.
Let's look at a basic example of a ``SyncConsumer``:
.. code-block:: python
from channels.consumer import SyncConsumer
class EchoConsumer(SyncConsumer):
def websocket_connect(self, event):
self.send({
"type": "websocket.accept",
})
def websocket_receive(self, event):
self.send({
"type": "websocket.send",
"text": event["text"],
})
This is a very simple WebSocket echo server - it will accept all incoming
WebSocket connections, and then reply to all incoming WebSocket text frames
with the same text.
Consumers are structured around a series of named methods corresponding to the
``type`` value of the messages they are going to receive, with any ``.``
replaced by ``_``. The two handlers above are handling ``websocket.connect``
and ``websocket.receive`` messages respectively.
How did we know what event types we were going to get and what would be
in them (like ``websocket.receive`` having a ``text``) key? That's because we
designed this against the ASGI WebSocket specification, which tells us how
WebSockets are presented - read more about it in :doc:`ASGI </asgi>` - and
protected this application with a router that checks for a scope type of
``websocket`` - see more about that in :doc:`/topics/routing`.
Apart from that, the only other basic API is ``self.send(event)``. This lets
you send events back to the client or protocol server as defined by the
protocol - if you read the WebSocket protocol, you'll see that the dict we
send above is how you send a text frame to the client.
The ``AsyncConsumer`` is laid out very similarly, but all the handler methods
must be coroutines, and ``self.send`` is a coroutine:
.. code-block:: python
from channels.consumer import AsyncConsumer
class EchoConsumer(AsyncConsumer):
async def websocket_connect(self, event):
await self.send({
"type": "websocket.accept",
})
async def websocket_receive(self, event):
await self.send({
"type": "websocket.send",
"text": event["text"],
})
When should you use ``AsyncConsumer`` and when should you use ``SyncConsumer``?
The main thing to consider is what you're talking to. If you call a slow
synchronous function from inside an ``AsyncConsumer`` you're going to hold up
the entire event loop, so they're only useful if you're also calling async
code (for example, using ``HTTPX`` to fetch 20 pages in parallel).
If you're calling any part of Django's ORM or other synchronous code, you
should use a ``SyncConsumer``, as this will run the whole consumer in a thread
and stop your ORM queries blocking the entire server.
We recommend that you **write SyncConsumers by default**, and only use
AsyncConsumers in cases where you know you are doing something that would
be improved by async handling (long-running tasks that could be done in
parallel) *and* you are only using async-native libraries.
If you really want to call a synchronous function from an ``AsyncConsumer``,
take a look at ``asgiref.sync.sync_to_async``, which is the utility that Channels
uses to run ``SyncConsumers`` in threadpools, and can turn any synchronous
callable into an asynchronous coroutine.
.. important::
If you want to call the Django ORM from an ``AsyncConsumer`` (or any other
asynchronous code), you should use the ``database_sync_to_async`` adapter
instead. See :doc:`/topics/databases` for more.
Closing Consumers
~~~~~~~~~~~~~~~~~
When the socket or connection attached to your consumer is closed - either by
you or the client - you will likely get an event sent to you (for example,
``http.disconnect`` or ``websocket.disconnect``), and your application instance
will be given a short amount of time to act on it.
Once you have finished doing your post-disconnect cleanup, you need to raise
``channels.exceptions.StopConsumer`` to halt the ASGI application cleanly and
let the server clean it up. If you leave it running - by not raising this
exception - the server will reach its application close timeout (which is
10 seconds by default in Daphne) and then kill your application and raise
a warning.
The generic consumers below do this for you, so this is only needed if you
are writing your own consumer class based on ``AsyncConsumer`` or
``SyncConsumer``. However, if you override their ``__call__`` method, or
block the handling methods that it calls from returning, you may still run into
this; take a look at their source code if you want more information.
Additionally, if you launch your own background coroutines, make sure to also
shut them down when the connection is finished, or you'll leak coroutines into
the server.
Channel Layers
~~~~~~~~~~~~~~
Consumers also let you deal with Channel's *channel layers*, to let them
send messages between each other either one-to-one or via a broadcast system
called groups.
Consumers will use the channel layer ``default`` unless
the ``channel_layer_alias`` attribute is set when subclassing any
of the provided ``Consumer`` classes.
To use the channel layer ``echo_alias`` we would set it like so:
.. code-block:: python
from channels.consumer import SyncConsumer
class EchoConsumer(SyncConsumer):
channel_layer_alias = "echo_alias"
You can read more in :doc:`/topics/channel_layers`.
.. _scope:
Scope
-----
Consumers receive the connection's ``scope`` when they are called, which
contains a lot of the information you'd find on the ``request`` object in a
Django view. It's available as ``self.scope`` inside the consumer's methods.
Scopes are part of the :doc:`ASGI specification </asgi>`, but here are
some common things you might want to use:
* ``scope["path"]``, the path on the request. *(HTTP and WebSocket)*
* ``scope["headers"]``, raw name/value header pairs from the request *(HTTP and WebSocket)*
* ``scope["method"]``, the method name used for the request. *(HTTP)*
If you enable things like :doc:`authentication`, you'll also be able to access
the user object as ``scope["user"]``, and the URLRouter, for example, will
put captured groups from the URL into ``scope["url_route"]``.
In general, the scope is the place to get connection information and where
middleware will put attributes it wants to let you access (in the same way
that Django's middleware adds things to ``request``).
For a full list of what can occur in a connection scope, look at the basic
ASGI spec for the protocol you are terminating, plus any middleware or routing
code you are using. The web (HTTP and WebSocket) scopes are available in
`the Web ASGI spec <https://github.com/django/asgiref/blob/master/specs/www.rst>`_.
Generic Consumers
-----------------
What you see above is the basic layout of a consumer that works for any
protocol. Much like Django's *generic views*, Channels ships with
*generic consumers* that wrap common functionality up so you don't need to
rewrite it, specifically for HTTP and WebSocket handling.
WebsocketConsumer
~~~~~~~~~~~~~~~~~
Available as ``channels.generic.websocket.WebsocketConsumer``, this
wraps the verbose plain-ASGI message sending and receiving into handling that
just deals with text and binary frames:
.. code-block:: python
from channels.generic.websocket import WebsocketConsumer
class MyConsumer(WebsocketConsumer):
groups = ["broadcast"]
def connect(self):
# Called on connection.
# To accept the connection call:
self.accept()
# Or accept the connection and specify a chosen subprotocol.
# A list of subprotocols specified by the connecting client
# will be available in self.scope['subprotocols']
self.accept("subprotocol")
# To reject the connection, call:
self.close()
def receive(self, text_data=None, bytes_data=None):
# Called with either text_data or bytes_data for each frame
# You can call:
self.send(text_data="Hello world!")
# Or, to send a binary frame:
self.send(bytes_data="Hello world!")
# Want to force-close the connection? Call:
self.close()
# Or add a custom WebSocket error code!
self.close(code=4123)
def disconnect(self, close_code):
# Called when the socket closes
You can also raise ``channels.exceptions.AcceptConnection`` or
``channels.exceptions.DenyConnection`` from anywhere inside the ``connect``
method in order to accept or reject a connection, if you want reusable
authentication or rate-limiting code that doesn't need to use mixins.
A ``WebsocketConsumer``'s channel will automatically be added to (on connect)
and removed from (on disconnect) any groups whose names appear in the
consumer's ``groups`` class attribute. ``groups`` must be an iterable, and a
channel layer with support for groups must be set as the channel backend
(``channels.layers.InMemoryChannelLayer`` and
``channels_redis.core.RedisChannelLayer`` both support groups). If no channel
layer is configured or the channel layer doesn't support groups, connecting
to a ``WebsocketConsumer`` with a non-empty ``groups`` attribute will raise
``channels.exceptions.InvalidChannelLayerError``. See :ref:`groups` for more.
AsyncWebsocketConsumer
~~~~~~~~~~~~~~~~~~~~~~
Available as ``channels.generic.websocket.AsyncWebsocketConsumer``, this has
the exact same methods and signature as ``WebsocketConsumer`` but everything
is async, and the functions you need to write have to be as well:
.. code-block:: python
from channels.generic.websocket import AsyncWebsocketConsumer
class MyConsumer(AsyncWebsocketConsumer):
groups = ["broadcast"]
async def connect(self):
# Called on connection.
# To accept the connection call:
await self.accept()
# Or accept the connection and specify a chosen subprotocol.
# A list of subprotocols specified by the connecting client
# will be available in self.scope['subprotocols']
await self.accept("subprotocol")
# To reject the connection, call:
await self.close()
async def receive(self, text_data=None, bytes_data=None):
# Called with either text_data or bytes_data for each frame
# You can call:
await self.send(text_data="Hello world!")
# Or, to send a binary frame:
await self.send(bytes_data="Hello world!")
# Want to force-close the connection? Call:
await self.close()
# Or add a custom WebSocket error code!
await self.close(code=4123)
async def disconnect(self, close_code):
# Called when the socket closes
JsonWebsocketConsumer
~~~~~~~~~~~~~~~~~~~~~
Available as ``channels.generic.websocket.JsonWebsocketConsumer``, this
works like ``WebsocketConsumer``, except it will auto-encode and decode
to JSON sent as WebSocket text frames.
The only API differences are:
* Your ``receive_json`` method must take a single argument, ``content``, that
is the decoded JSON object.
* ``self.send_json`` takes only a single argument, ``content``, which will be
encoded to JSON for you.
If you want to customise the JSON encoding and decoding, you can override
the ``encode_json`` and ``decode_json`` classmethods.
AsyncJsonWebsocketConsumer
~~~~~~~~~~~~~~~~~~~~~~~~~~
An async version of ``JsonWebsocketConsumer``, available as
``channels.generic.websocket.AsyncJsonWebsocketConsumer``. Note that even
``encode_json`` and ``decode_json`` are async functions.
AsyncHttpConsumer
~~~~~~~~~~~~~~~~~
Available as ``channels.generic.http.AsyncHttpConsumer``, this offers basic
primitives to implement a HTTP endpoint:
.. code-block:: python
from channels.generic.http import AsyncHttpConsumer
class BasicHttpConsumer(AsyncHttpConsumer):
async def handle(self, body):
await asyncio.sleep(10)
await self.send_response(200, b"Your response bytes", headers=[
(b"Content-Type", b"text/plain"),
])
You are expected to implement your own ``handle`` method. The
method receives the whole request body as a single bytestring. Headers
may either be passed as a list of tuples or as a dictionary. The
response body content is expected to be a bytestring.
You can also implement a ``disconnect`` method if you want to run code on
disconnect - for example, to shut down any coroutines you launched. This will
run even on an unclean disconnection, so don't expect that ``handle`` has
finished running cleanly.
If you need more control over the response, e.g. for implementing long
polling, use the lower level ``self.send_headers`` and ``self.send_body``
methods instead. This example already mentions channel layers which will
be explained in detail later:
.. code-block:: python
import json
from channels.generic.http import AsyncHttpConsumer
class LongPollConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_headers(headers=[
(b"Content-Type", b"application/json"),
])
# Headers are only sent after the first body event.
# Set "more_body" to tell the interface server to not
# finish the response yet:
await self.send_body(b"", more_body=True)
async def chat_message(self, event):
# Send JSON and finish the response:
await self.send_body(json.dumps(event).encode("utf-8"))
Of course you can also use those primitives to implement a HTTP endpoint for
`Server-sent events <https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events>`_:
.. code-block:: python
from datetime import datetime
from channels.generic.http import AsyncHttpConsumer
class ServerSentEventsConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_headers(headers=[
(b"Cache-Control", b"no-cache"),
(b"Content-Type", b"text/event-stream"),
(b"Transfer-Encoding", b"chunked"),
])
while True:
payload = "data: %s\n\n" % datetime.now().isoformat()
await self.send_body(payload.encode("utf-8"), more_body=True)
await asyncio.sleep(1)
|