File: acquire_video_threaded.py

package info (click to toggle)
camitk 6.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 389,496 kB
  • sloc: cpp: 103,476; sh: 2,448; python: 1,618; xml: 984; makefile: 128; perl: 84; sed: 20
file content (103 lines) | stat: -rw-r--r-- 3,474 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# This is a CamiTK python action
#
# Video acquisition using OpenCV.
#
# This action creates a new  image component and continuously replaces
# its image data using the video feed. 
#
# Three types of video can be acquired:
# - grayscale video
# - RGB video
# - HSV mode video
# 
# You can pause/restart the video acquisition at any time.
# This action uses a thread to refresh the image.
# TODO Using threads does not work yet in CamiTK Python extensions
# TODO So this action does not work as expected

import camitk
from PySide2.QtCore import QObject, QThread, pyqtSignal
from dataclasses import dataclass, field
import cv2

@dataclass
class ActionState:
    action:camitk.Action = None
    counter: int = 0
    camera_index: int = 0
    init:bool = False
    worker:QObject = None
    thread:QThread() = None
    capture_device:cv2.VideoCapture = None
    image_component:camitk.ImageComponent = None
    rgb_frame:cv2.cvtColor = None
    
state = ActionState()

class Worker(QObject):
    finished = pyqtSignal()
    grabbed = pyqtSignal(int) #, cv2.cvtColor)

    def run(self):
        while True:
            state.counter += 1
            ret, frame = state.capture_device.read()
            if not ret:
                camitk.warning("Error: Could not read frame.")
                return None
            else:
                # Convert the image from BGR to RGB
                state.rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                self.grabbed.emit(state.counter) #, state.rgb_frame)
            # sleep(0.1)
        self.finished.emit()

def init(self:camitk.Action):
    state.action = self
    state.counter = 1
    state.camera_index = 0

def update_image(counter): #, rgb_frame):
    if state.rgb_frame is not None:
        if not state.image_component:
            state.image_component = camitk.newImageComponentFromNumpy(state.rgb_frame, "Webcam Video")
            camitk.refresh()
        else:
            state.image_component.replaceImageData(state.rgb_frame)
    self.stepLabel.setText(f"Grabbed frame #{counter}. State {state}")
        
def process(self:camitk.Action):
    camitk.info(f"process begin: {state}")
    if not state.worker:
        state.capture_device = cv2.VideoCapture(state.camera_index)
        if not state.capture_device.isOpened():
            camitk.warning(f"Error: Could not open capturing device #{camera_index}.")
            return

        state.thread = QThread()
        state.worker = Worker()
        state.worker.moveToThread(state.thread)
        state.thread.started.connect(state.worker.run)
        state.worker.finished.connect(state.thread.quit)
        state.worker.finished.connect(state.worker.deleteLater)
        state.thread.finished.connect(state.thread.deleteLater)
        state.worker.grabbed.connect(update_image)
        state.thread.start()

        # Final resets
        # TODO Change Apply button to "Stop Acquisition"
        state.thread.finished.connect(
            lambda: camitk.info("Thread finished")
           
        )
        # state.thread.finished.connect(
        #     lambda: state.worker = None
        # )
        
    # Note: if camitk.refresh() or self.refreshApplication() is called before the
    # end of the method (e.g. just before the "state.timer.start(10)"), the python objects are
    # lost → state.timer is NoneType...
    #
    # → call refresh only when everything is in place
    # self.refreshApplication()
    # camitk.refresh()