File: test.py

package info (click to toggle)
aioacaia 0.1.18-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 268 kB
  • sloc: python: 847; makefile: 2
file content (63 lines) | stat: -rw-r--r-- 1,730 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from bleak import BleakClient
import asyncio


CHAR_ID = "49535343-8841-43f4-a8d4-ecbe34729bb3"
HEADER1 = 0xef
HEADER2 = 0xdd

def encode(msgType,payload):
    bytes=bytearray(5+len(payload))

    bytes[0] = HEADER1
    bytes[1] = HEADER2
    bytes[2] = msgType
    cksum1 = 0
    cksum2 = 0


    for i in range(len(payload)):
        val = payload[i] & 0xff
        bytes[3+i] = val
        if (i % 2 == 0):
            cksum1 += val
        else:
            cksum2 += val

    bytes[len(payload) + 3] = (cksum1 & 0xFF)
    bytes[len(payload) + 4] = (cksum2 & 0xFF)

    return bytes


msgs = {
    "tare": encode(4, [0]),
    "startTimer": encode(13, [0,0]),
    "stopTimer": encode(13, [0,2]),
    "resetTimer": encode(13, [0,1]),
    "heartbeat": encode(0, [2,0])
}

def encodeId(isPyxisStyle=False):
    if isPyxisStyle:
        payload = bytearray([0x30,0x31,0x32,0x33,0x34,0x35,0x36,0x37,0x38,0x39,0x30,0x31,0x32,0x33,0x34])
    else:
        payload = bytearray([0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d,0x2d])
    return encode(11,payload)

async def main():
    async with BleakClient("33:11:22:33:44") as client:
        print("connected")
        print("Writing id...")
        await client.write_gatt_char(CHAR_ID, encodeId(isPyxisStyle=False))
        for i in msgs:
            if i == "heartbeat":
                continue
            await asyncio.sleep(5)
            await client.write_gatt_char(CHAR_ID, msgs["heartbeat"])
            await asyncio.sleep(1)
            print(f"Writing {i}...")
            print(f"Writing {msgs[i]}...")
            await client.write_gatt_char(CHAR_ID, msgs[i])

asyncio.run(main())