File: pollingtest.py

package info (click to toggle)
python-boschshcpy 0.2.92-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 268 kB
  • sloc: python: 3,343; makefile: 4; sh: 4
file content (68 lines) | stat: -rwxr-xr-x 1,814 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import os
import sys, time
import zeroconf
import logging
from zeroconf import Zeroconf

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))

from boschshcpy import SHCSession, SHCDevice

logging.basicConfig(level=logging.DEBUG)


def callback():
    print("Notification of device update")


def setup_session():
    # Create a BoschSHC client with the specified ACCESS_CERT and ACCESS_KEY.
    zeroconf = Zeroconf()
    session = SHCSession(
        controller_ip="192.168.1.6",
        certificate="../keystore/dev-cert.pem",
        key="../keystore/dev-key.pem",
        zeroconf=zeroconf,
    )
    zeroconf.close()

    shc_info = session.information
    print("  version        : %s" % shc_info.version)
    print("  updateState    : %s" % shc_info.updateState)

    print("Accessing all devices...")

    smart_plugs = session.device_helper.smart_plugs
    for item in smart_plugs:
        for service in item.device_services:
            service.subscribe_callback(item.id, callback)

    shutter_controls: SHCDevice = session.device_helper.shutter_controls
    for item in shutter_controls:
        for service in item.device_services:
            service.subscribe_callback(item.id, callback)

    return session


def main():
    exit = False
    session = setup_session()
    duration = 0
    while not exit:
        userInput = input("Enter seconds for polling duration (or nothing to exit) ")

        if userInput.isdigit():
            duration = int(userInput)
            print("Starting polling for {} seconds".format(duration))
            session.start_polling()
            time.sleep(duration)
            session.stop_polling()
            print("Stopping polling")
        else:
            exit = True
            print("OK Bye, bye.")


if __name__ == "__main__":
    main()