File: rtl_433_http_cmd.py

package info (click to toggle)
rtl-433 25.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,192 kB
  • sloc: ansic: 55,982; cpp: 3,263; python: 2,544; php: 55; javascript: 43; sh: 18; makefile: 16
file content (68 lines) | stat: -rwxr-xr-x 1,767 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3

"""Custom hop controller example for rtl_433's HTTP cmd API."""

# Start rtl_433 (`rtl_433 -F http`), then this script.
# Needs the Requests package to be installed.

import requests
import json
from time import sleep

# You can run rtl_433 and this script on different machines,
# start rtl_433 with `-F http:0.0.0.0`, and change
# to e.g. `HTTP_HOST = "192.168.1.100"` (use your server ip) below.
HTTP_HOST = "127.0.0.1"
HTTP_PORT = 8433


def set_freq(freq):
    return send_cmd({'cmd': 'center_frequency', 'val': freq})


def set_rate(rate):
    return send_cmd({'cmd': 'sample_rate', 'val': rate})


def send_cmd(params):
    url = f'http://{HTTP_HOST}:{HTTP_PORT}/cmd'
    headers = {'Accept': 'application/json'}

    # You will receive JSON events, one per line terminated with CRLF.
    # Use GET
    response = requests.get(url, params=params, headers=headers, timeout=70, stream=True)
    # or POST
    # response = requests.post(url, data=params, headers=headers, timeout=70, stream=True)
    print(f'Sending {params} to {url}')

    # Answer is lines of JSON
    return response.text


def rtl_433_control():
    """Simple timed control of rtl_433 in a loop forever."""

    # Loop forever
    while True:
        try:
            # Set first hop
            sleep(10)
            print(set_freq(433920000))
            print(set_rate(250000))

            # Set second hop
            sleep(10)
            print(set_freq(868000000))
            print(set_rate(1024000))

        except requests.ConnectionError:
            print('Connection failed, retrying in 60s...')
            sleep(60)


if __name__ == "__main__":
    try:
        rtl_433_control()
    except KeyboardInterrupt:
        print('\nExiting.')
        pass