File: pubsub2.py

package info (click to toggle)
aioredis 1.3.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 1,104 kB
  • sloc: python: 11,112; makefile: 7
file content (55 lines) | stat: -rw-r--r-- 1,565 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
import aioredis


async def pubsub():
    sub = await aioredis.create_redis(
         'redis://localhost')

    ch1, ch2 = await sub.subscribe('channel:1', 'channel:2')
    assert isinstance(ch1, aioredis.Channel)
    assert isinstance(ch2, aioredis.Channel)

    async def async_reader(channel):
        while await channel.wait_message():
            msg = await channel.get(encoding='utf-8')
            # ... process message ...
            print("message in {}: {}".format(channel.name, msg))

    tsk1 = asyncio.ensure_future(async_reader(ch1))

    # Or alternatively:

    async def async_reader2(channel):
        while True:
            msg = await channel.get(encoding='utf-8')
            if msg is None:
                break
            # ... process message ...
            print("message in {}: {}".format(channel.name, msg))

    tsk2 = asyncio.ensure_future(async_reader2(ch2))

    # Publish messages and terminate
    pub = await aioredis.create_redis(
        'redis://localhost')
    while True:
        channels = await pub.pubsub_channels('channel:*')
        if len(channels) == 2:
            break

    for msg in ("Hello", ",", "world!"):
        for ch in ('channel:1', 'channel:2'):
            await pub.publish(ch, msg)
    await asyncio.sleep(0.1)
    pub.close()
    sub.close()
    await pub.wait_closed()
    await sub.wait_closed()
    await asyncio.gather(tsk1, tsk2)


if __name__ == '__main__':
    import os
    if 'redis_version:2.6' not in os.environ.get('REDIS_VERSION', ''):
        asyncio.run(pubsub())