1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
# This is a CamiTK python action
#
# Video acquisition using OpenCV.
#
# This action creates a new image component and continuously replaces
# its image data using the video feed.
#
# Three types of video can be acquired:
# - grayscale video
# - RGB video
# - HSV mode video
#
# You can pause/restart the video acquisition at any time.
# This action uses a thread to refresh the image.
# TODO Using threads does not work yet in CamiTK Python extensions
# TODO So this action does not work as expected
import camitk
from PySide2.QtCore import QObject, QThread, pyqtSignal
from dataclasses import dataclass, field
import cv2
@dataclass
class ActionState:
action:camitk.Action = None
counter: int = 0
camera_index: int = 0
init:bool = False
worker:QObject = None
thread:QThread() = None
capture_device:cv2.VideoCapture = None
image_component:camitk.ImageComponent = None
rgb_frame:cv2.cvtColor = None
state = ActionState()
class Worker(QObject):
finished = pyqtSignal()
grabbed = pyqtSignal(int) #, cv2.cvtColor)
def run(self):
while True:
state.counter += 1
ret, frame = state.capture_device.read()
if not ret:
camitk.warning("Error: Could not read frame.")
return None
else:
# Convert the image from BGR to RGB
state.rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.grabbed.emit(state.counter) #, state.rgb_frame)
# sleep(0.1)
self.finished.emit()
def init(self:camitk.Action):
state.action = self
state.counter = 1
state.camera_index = 0
def update_image(counter): #, rgb_frame):
if state.rgb_frame is not None:
if not state.image_component:
state.image_component = camitk.newImageComponentFromNumpy(state.rgb_frame, "Webcam Video")
camitk.refresh()
else:
state.image_component.replaceImageData(state.rgb_frame)
self.stepLabel.setText(f"Grabbed frame #{counter}. State {state}")
def process(self:camitk.Action):
camitk.info(f"process begin: {state}")
if not state.worker:
state.capture_device = cv2.VideoCapture(state.camera_index)
if not state.capture_device.isOpened():
camitk.warning(f"Error: Could not open capturing device #{camera_index}.")
return
state.thread = QThread()
state.worker = Worker()
state.worker.moveToThread(state.thread)
state.thread.started.connect(state.worker.run)
state.worker.finished.connect(state.thread.quit)
state.worker.finished.connect(state.worker.deleteLater)
state.thread.finished.connect(state.thread.deleteLater)
state.worker.grabbed.connect(update_image)
state.thread.start()
# Final resets
# TODO Change Apply button to "Stop Acquisition"
state.thread.finished.connect(
lambda: camitk.info("Thread finished")
)
# state.thread.finished.connect(
# lambda: state.worker = None
# )
# Note: if camitk.refresh() or self.refreshApplication() is called before the
# end of the method (e.g. just before the "state.timer.start(10)"), the python objects are
# lost → state.timer is NoneType...
#
# → call refresh only when everything is in place
# self.refreshApplication()
# camitk.refresh()
|