File: stream.py

package info (click to toggle)
python-bellows 0.40.5-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 992 kB
  • sloc: python: 13,630; sh: 7; makefile: 4
file content (59 lines) | stat: -rw-r--r-- 1,720 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import asyncio
import logging
import time

import click

from . import util
from .main import main

LOGGER = logging.getLogger(__name__)


@main.command()
@click.option(
    "-c", "--channel", type=click.IntRange(11, 26), metavar="CHANNEL", required=True
)
@click.option(
    "-p", "--power", type=click.IntRange(-100, 20), metavar="POWER", required=True
)
@click.pass_context
def stream(ctx, channel, power):
    """Transmit random stream of characters on CHANNEL with POWER (in dBm)."""
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(_stream(ctx, channel, power))
    except KeyboardInterrupt:
        start_time = ctx.obj.get("start_time", None)
        if start_time:
            duration = time.time() - start_time
            click.echo(
                "\nStreamed on channel %d for %0.2fs" % (channel, duration), err=True
            )
    finally:
        if "ezsp" in ctx.obj:
            s = ctx.obj["ezsp"]
            loop.run_until_complete(s.mfglibStopStream())
            loop.run_until_complete(s.mfglibEnd())
            s.close()


async def _stream(ctx, channel, power):
    s = await util.setup(ctx.obj["device"], ctx.obj["baudrate"])
    ctx.obj["ezsp"] = s

    v = await s.mfglibStart(False)
    util.check(v[0], "Unable to start mfglib")

    v = await s.mfglibSetChannel(channel)
    util.check(v[0], "Unable to set channel")

    v = await s.mfglibSetPower(0, power)
    util.check(v[0], "Unable to set power")

    v = await s.mfglibStartStream()
    util.check(v[0], "Unable to start stream")
    click.echo("Started transmitting random stream of characters", err=True)
    ctx.obj["start_time"] = time.time()
    while True:
        await asyncio.sleep(3600)