1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
import asyncio
import logging
import time
import click
from . import util
from .main import main
LOGGER = logging.getLogger(__name__)
@main.command()
@click.option(
"-c", "--channel", type=click.IntRange(11, 26), metavar="CHANNEL", required=True
)
@click.option(
"-p", "--power", type=click.IntRange(-100, 20), metavar="POWER", required=True
)
@click.pass_context
def stream(ctx, channel, power):
"""Transmit random stream of characters on CHANNEL with POWER (in dBm)."""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_stream(ctx, channel, power))
except KeyboardInterrupt:
start_time = ctx.obj.get("start_time", None)
if start_time:
duration = time.time() - start_time
click.echo(
"\nStreamed on channel %d for %0.2fs" % (channel, duration), err=True
)
finally:
if "ezsp" in ctx.obj:
s = ctx.obj["ezsp"]
loop.run_until_complete(s.mfglibStopStream())
loop.run_until_complete(s.mfglibEnd())
s.close()
async def _stream(ctx, channel, power):
s = await util.setup(ctx.obj["device"], ctx.obj["baudrate"])
ctx.obj["ezsp"] = s
v = await s.mfglibStart(False)
util.check(v[0], "Unable to start mfglib")
v = await s.mfglibSetChannel(channel)
util.check(v[0], "Unable to set channel")
v = await s.mfglibSetPower(0, power)
util.check(v[0], "Unable to set power")
v = await s.mfglibStartStream()
util.check(v[0], "Unable to start stream")
click.echo("Started transmitting random stream of characters", err=True)
ctx.obj["start_time"] = time.time()
while True:
await asyncio.sleep(3600)
|