File: async-snapshot

package info (click to toggle)
python-pyaarlo 0.8.0.15-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 556 kB
  • sloc: python: 6,064; makefile: 6; sh: 1
file content (61 lines) | stat: -rwxr-xr-x 1,813 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python3
#

import logging
import io
import os
import sys
import time
from PIL import Image

# for examples add pyaarlo install path
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
import pyaarlo

USERNAME = os.environ.get('ARLO_USERNAME', 'test.login@gmail.com')
PASSWORD = os.environ.get('ARLO_PASSWORD', 'test-password')

# setup logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
_LOGGER = logging.getLogger('pyaarlo')

# define a simple snapshot callback
def snapshot_callback(camera, attr, value):
    """
    Snapshot callback converts snapshot image bytes 
    to a PIL Image and shows it
    """
    # convert snapshot to PIL Image
    img = Image.open(io.BytesIO(value))
    _LOGGER.info(f'got snapshot from {camera.name} {img.size}')

    # show the image
    img.show()

# login
_LOGGER.info('logging in')
arlo = pyaarlo.PyArlo(username=USERNAME, password=PASSWORD,
                      tfa_type='SMS', tfa_source='console',
                      save_state=False, dump=False, storage_dir='aarlo')


# register snapshot callback with each camera
for camera in arlo.cameras:
    camera.add_attr_callback(pyaarlo.constant.LAST_IMAGE_DATA_KEY, snapshot_callback)

# set the wait time
refresh_interval_s = 60
_LOGGER.info(f'requesting snapshots from cameras with {1. / float(refresh_interval_s):.2f}Hz interval')

# request snapshots for 5 minutes
end_time = time.time() + 5 * 60

while time.time() < end_time:
    _LOGGER.info('scheduling snapshots without blocking')
    for camera in arlo.cameras:
        camera.request_snapshot()

    _LOGGER.info(f'waiting for {refresh_interval_s}s ({end_time - time.time():.2f}s remaining in example)')
    time.sleep(refresh_interval_s)