File: weather.py

package info (click to toggle)
python-aprslib 0.7.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 352 kB
  • sloc: python: 2,973; makefile: 216
file content (79 lines) | stat: -rw-r--r-- 2,153 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import re
from aprslib.exceptions import ParseError

__all__ = [
    'parse_weather',
    'parse_weather_data',
    ]

# constants
wind_multiplier = 0.44704
rain_multiplier = 0.254

key_map = {
    'g': 'wind_gust',
    'c': 'wind_direction',
    't': 'temperature',
    'S': 'wind_speed',
    'r': 'rain_1h',
    'p': 'rain_24h',
    'P': 'rain_since_midnight',
    'h': 'humidity',
    'b': 'pressure',
    'l': 'luminosity',
    'L': 'luminosity',
    's': 'snow',
    '#': 'rain_raw',
}
val_map = {
    'g': lambda x: int(x) * wind_multiplier,
    'c': lambda x: int(x),
    'S': lambda x: int(x) * wind_multiplier,
    't': lambda x: (float(x) - 32) / 1.8,
    'r': lambda x: int(x) * rain_multiplier,
    'p': lambda x: int(x) * rain_multiplier,
    'P': lambda x: int(x) * rain_multiplier,
    'h': lambda x: 100 if int(x) == 0 else int(x),
    'b': lambda x: float(x) / 10,
    'l': lambda x: int(x) + 1000,
    'L': lambda x: int(x),
    's': lambda x: float(x) * 25.4,
    '#': lambda x: int(x),
}

def parse_weather_data(body):
    parsed = {}

    # parse weather data
    body = re.sub(r"^([0-9]{3})/([0-9]{3})", "c\\1s\\2", body)
    body = body.replace('s', 'S', 1)

    # match as many parameters from the start, rest is comment
    data = re.match(r"^([cSgtrpPlLs#][0-9\-\. ]{3}|h[0-9\. ]{2}|b[0-9\. ]{5})+", body)

    if data:
        data = data.group()
        # split out data from comment
        body = body[len(data):]
        # parse all weather parameters
        data = re.findall(r"([cSgtrpPlLs#]\d{3}|t-\d{2}|h\d{2}|b\d{5}|s\.\d{2}|s\d\.\d)", data)
        data = map(lambda x: (key_map[x[0]] , val_map[x[0]](x[1:])), data)
        parsed.update(dict(data))

    return (body, parsed)

def parse_weather(body):
    match = re.match(r"^(\d{8})c[\. \d]{3}s[\. \d]{3}g[\. \d]{3}t[\. \d]{3}", body)
    if not match:
        raise ParseError("invalid positionless weather report format")

    comment, weather = parse_weather_data(body[8:])

    parsed = {
        'format': 'wx',
        'wx_raw_timestamp': match.group(1),
        'comment': comment.strip(' '),
        'weather': weather,
        }

    return ('', parsed)