File: network.py

package info (click to toggle)
python-bellows 0.40.5-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 992 kB
  • sloc: python: 13,630; sh: 7; makefile: 4
file content (160 lines) | stat: -rw-r--r-- 4,829 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import asyncio
import functools
import logging
import math

import click
from zigpy.config import SCHEMA_NETWORK

import bellows.types as t
import bellows.zigbee.util as zutil

from . import opts, util
from .main import main

LOGGER = logging.getLogger(__name__)


@main.command()
@opts.channels
@opts.extended_pan
@opts.pan
@click.pass_context
@util.background
async def join(ctx, channels, pan_id, extended_pan_id):
    """Join an existing ZigBee network as an end device"""

    def cb(fut, frame_name, response):
        if frame_name == "stackStatusHandler":
            fut.set_result(response)

    s = await util.setup(ctx.obj["device"], ctx.obj["baudrate"])

    channel = None

    if len(channels) != 1:
        if pan_id or extended_pan_id:
            raise click.BadOptionUsage(
                "--channels", "Specify exactly one channel to join a specific network"
            )
    else:
        channel = t.uint8_t(channels[0])

    if not (pan_id or extended_pan_id):
        scan_type = t.EzspNetworkScanType.ACTIVE_SCAN
        channel_mask = util.channel_mask(channels)
        click.echo(
            "PAN not provided, scanning channels %s..."
            % (" ".join(map(str, channels)),)
        )
        v = await s.startScan(scan_type, channel_mask, 3)

        networks = [n[0] for n in v if n[0].allowingJoin]
        if len(networks) == 0:
            click.echo("No joinable networks found")
            return 1
        if len(networks) > 1:
            click.echo("Multiple joinable networks found. Refusing to pick.")
            return 1

        network = networks[0]

        pan_id = network.panId
        extended_pan_id = network.extendedPanId
        channel = network.channel

        click.echo(f"Found network {pan_id} {extended_pan_id} on channel {channel}")

    if pan_id is None:
        pan_id = t.uint16_t(0)
    else:
        pan_id = t.uint16_t(pan_id)
    if isinstance(extended_pan_id, str):
        extended_pan_id = util.parse_epan(extended_pan_id)
    if extended_pan_id is None:
        extended_pan_id = t.FixedList[t.uint8_t, 8]([t.uint8_t(0)] * 8)

    v = await util.network_init(s)

    if v[0] == t.EmberStatus.SUCCESS:
        LOGGER.debug("Network was up, leaving...")
        v = await s.leaveNetwork()
        util.check(v[0], f"Failure leaving network: {v[0]}")
        await asyncio.sleep(1)  # TODO

    initial_security_state = zutil.zha_security(SCHEMA_NETWORK({}))
    v = await s.setInitialSecurityState(initial_security_state)
    util.check(v[0], f"Setting security state failed: {v[0]}")

    parameters = t.EmberNetworkParameters()
    parameters.extendedPanId = extended_pan_id
    parameters.panId = pan_id
    parameters.radioTxPower = t.uint8_t(8)
    parameters.radioChannel = t.uint8_t(channel)
    parameters.joinMethod = t.EmberJoinMethod.USE_MAC_ASSOCIATION
    parameters.nwkManagerId = t.EmberNodeId(0)
    parameters.nwkUpdateId = t.uint8_t(0)
    parameters.channels = t.uint32_t(0)
    click.echo(parameters)

    fut = asyncio.Future()
    cbid = s.add_callback(functools.partial(cb, fut))
    v = await s.joinNetwork(t.EmberNodeType.END_DEVICE, parameters)
    util.check(v[0], f"Joining network failed: {v[0]}")
    v = await fut
    click.echo(v)

    s.remove_callback(cbid)

    s.close()


@main.command()
@click.pass_context
@util.background
async def leave(ctx):
    """Leave the ZigBee network"""
    s = await util.setup(ctx.obj["device"], ctx.obj["baudrate"])
    v = await util.network_init(s)
    if v[0] == t.EmberStatus.NOT_JOINED:
        click.echo("Not joined, not leaving")
    else:
        v = await s.leaveNetwork()
        util.check(
            v[0],
            f"Failure leaving network: {v[0]}",
            expected=t.EmberStatus.NETWORK_DOWN,
        )

    s.close()


@main.command()
@opts.channels
@opts.duration_ms
@click.option("-e", "--energy", "energy_scan", is_flag=True)
@click.pass_context
@util.background
async def scan(ctx, channels, duration_ms, energy_scan):
    """Scan for networks or radio interference"""
    s = await util.setup(ctx.obj["device"], ctx.obj["baudrate"])

    channel_mask = util.channel_mask(channels)
    click.echo("Scanning channels {}".format(" ".join(map(str, channels))))

    # TFM says:
    #   Sets the exponent of the number of scan periods, where a scan period is
    #   960 symbols. The scan will occur for ((2^duration) + 1) scan periods.
    # 1 symbol is 16us
    duration_symbols = duration_ms / (960 * 0.016)
    duration_symbol_exp = max(0, math.ceil(math.log(duration_symbols - 1, 2)))

    scan_type = t.EzspNetworkScanType.ACTIVE_SCAN
    if energy_scan:
        scan_type = t.EzspNetworkScanType.ENERGY_SCAN

    v = await s.startScan(scan_type, channel_mask, duration_symbol_exp)
    for network in v:
        click.echo(network)

    s.close()