1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
#!/usr/bin/env python3
# SPDX-License-Identifier:Unlicense
# If you have installed aravis in a non standard location, you may need
# to make GI_TYPELIB_PATH point to the correct location. For example:
#
# export GI_TYPELIB_PATH=$GI_TYPELIB_PATH:/opt/bin/lib/girepositry-1.0/
#
# You may also have to give the path to libaravis.so, using LD_PRELOAD or
# LD_LIBRARY_PATH.
import threading
import time
import gi
# autopep8: off
gi.require_version('Aravis', '0.8')
from gi.repository import Aravis # noqa: E402
# autopep8: on
Aravis.enable_interface('Fake')
class UserData:
def __init__(self) -> None:
self.stream = None
def callback(user_data, cb_type, buffer):
print(f'Callback[{threading.get_native_id()}] {cb_type.value_name} {buffer=}')
if buffer is not None:
# Fake some light computation here (like copying the buffer). Do not do heavy image processing here
# since it would block the acquisition thread.
time.sleep(0.01)
# Re-enqueue the buffer
user_data.stream.push_buffer(buffer)
print(f'Main thread[{threading.get_native_id()}]')
cam = Aravis.Camera.new('Fake_1')
user_data = UserData()
stream = cam.create_stream(callback, user_data)
user_data.stream = stream
payload = cam.get_payload()
stream.push_buffer(Aravis.Buffer.new_allocate(payload))
cam.start_acquisition()
time.sleep(1)
cam.stop_acquisition()
|