File: sensor.py

package info (click to toggle)
ha-ffmpeg 3.2.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 184 kB
  • sloc: python: 625; makefile: 2
file content (244 lines) | stat: -rw-r--r-- 7,651 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""For HA sensor components."""
import asyncio
import logging
import re
from time import time
from typing import Callable, Coroutine, Optional

from .core import FFMPEG_STDOUT, HAFFmpegWorker
from .timeout import asyncio_timeout

_LOGGER = logging.getLogger(__name__)


class SensorNoise(HAFFmpegWorker):
    """Implement a noise detection on a autio stream."""

    STATE_NONE = 0
    STATE_NOISE = 1
    STATE_END = 2
    STATE_DETECT = 3

    def __init__(self, ffmpeg_bin: str, callback: Callable):
        """Init noise sensor."""
        super().__init__(ffmpeg_bin)

        self._callback = callback
        self._peak = -30
        self._time_duration = 1
        self._time_reset = 2

    def set_options(
        self, time_duration: int = 1, time_reset: int = 2, peak: int = -30
    ) -> None:
        """Set option parameter for noise sensor."""
        self._time_duration = time_duration
        self._time_reset = time_reset
        self._peak = peak

    def open_sensor(
        self,
        input_source: str,
        output_dest: Optional[str] = None,
        extra_cmd: Optional[str] = None,
    ) -> Coroutine:
        """Open FFmpeg process for read autio stream.

        Return a coroutine.
        """
        command = ["-vn", "-filter:a", f"silencedetect=n={self._peak}dB:d=1"]

        # run ffmpeg, read output
        return self.start_worker(
            cmd=command,
            input_source=input_source,
            output=output_dest,
            extra_cmd=extra_cmd,
            pattern="silence",
        )

    async def _worker_process(self) -> None:
        """This function processing data."""
        state = self.STATE_DETECT
        timeout = self._time_duration

        self._loop.call_soon(self._callback, False)

        re_start = re.compile("silence_start")
        re_end = re.compile("silence_end")

        # process queue data
        while True:
            try:
                _LOGGER.debug("Reading State: %d, timeout: %s", state, timeout)
                async with asyncio_timeout(timeout):
                    data = await self._queue.get()
                timeout = None
                if data is None:
                    self._loop.call_soon(self._callback, None)
                    return
            except asyncio.TimeoutError:
                _LOGGER.debug("Blocking timeout")
                # noise
                if state == self.STATE_DETECT:
                    # noise detected
                    self._loop.call_soon(self._callback, True)
                    state = self.STATE_NOISE

                elif state == self.STATE_END:
                    # no noise
                    self._loop.call_soon(self._callback, False)
                    state = self.STATE_NONE

                timeout = None
                continue

            if re_start.search(data):
                if state == self.STATE_NOISE:
                    # stop noise detection
                    state = self.STATE_END
                    timeout = self._time_reset
                elif state == self.STATE_DETECT:
                    # reset if only a peak
                    state = self.STATE_NONE
                continue

            if re_end.search(data):
                if state == self.STATE_NONE:
                    # detect noise begin
                    state = self.STATE_DETECT
                    timeout = self._time_duration
                elif state == self.STATE_END:
                    # back to noise status
                    state = self.STATE_NOISE
                continue

            _LOGGER.warning("Unknown data from queue!")


class SensorMotion(HAFFmpegWorker):
    """Implement motion detection with ffmpeg scene detection."""

    STATE_NONE = 0
    STATE_REPEAT = 1
    STATE_MOTION = 2

    MATCH = r"\d,.*\d,.*\d,.*\d,.*\d,.*\w"

    def __init__(self, ffmpeg_bin: str, callback: Callable):
        """Init motion sensor."""
        super().__init__(ffmpeg_bin)

        self._callback = callback
        self._changes = 10
        self._time_reset = 60
        self._time_repeat = 0
        self._repeat = 0

    def set_options(
        self,
        time_reset: int = 60,
        time_repeat: int = 0,
        repeat: int = 0,
        changes: int = 10,
    ) -> None:
        """Set option parameter for noise sensor."""
        self._time_reset = time_reset
        self._time_repeat = time_repeat
        self._repeat = repeat
        self._changes = changes

    async def open_sensor(
        self, input_source: str, extra_cmd: Optional[str] = None
    ) -> Coroutine:
        """Open FFmpeg process a video stream for motion detection.

        Return a coroutine.
        """
        command = [
            "-an",
            "-filter:v",
            f"select=gt(scene\\,{self._changes / 100})",
        ]

        # run ffmpeg, read output
        return await self.start_worker(
            cmd=command,
            input_source=input_source,
            output="-f framemd5 -",
            extra_cmd=extra_cmd,
            pattern=self.MATCH,
            reading=FFMPEG_STDOUT,
        )

    async def _worker_process(self) -> None:
        """This function processing data."""
        state = self.STATE_NONE
        timeout = None

        self._loop.call_soon(self._callback, False)

        # for repeat feature
        re_frame = 0
        re_time = 0

        re_data = re.compile(self.MATCH)

        # process queue data
        while True:
            try:
                _LOGGER.debug("Reading State: %d, timeout: %s", state, timeout)
                async with asyncio_timeout(timeout):
                    data = await self._queue.get()
                if data is None:
                    self._loop.call_soon(self._callback, None)
                    return
            except asyncio.TimeoutError:
                _LOGGER.debug("Blocking timeout")
                # reset motion detection
                if state == self.STATE_MOTION:
                    state = self.STATE_NONE
                    self._loop.call_soon(self._callback, False)
                    timeout = None
                # reset repeate state
                if state == self.STATE_REPEAT:
                    state = self.STATE_NONE
                    timeout = None
                continue

            frames = re_data.search(data)
            if frames:
                # repeat not used
                if self._repeat == 0 and state == self.STATE_NONE:
                    state = self.STATE_MOTION
                    self._loop.call_soon(self._callback, True)
                    timeout = self._time_reset

                # repeat feature is on / first motion
                if state == self.STATE_NONE:
                    state = self.STATE_REPEAT
                    timeout = self._time_repeat
                    re_frame = 0
                    re_time = time()

                elif state == self.STATE_REPEAT:
                    re_frame += 1

                    # REPEAT ready?
                    if re_frame >= self._repeat:
                        state = self.STATE_MOTION
                        self._loop.call_soon(self._callback, True)
                        timeout = self._time_reset
                    else:
                        past = time() - re_time
                        timeout -= past

                    # REPEAT time down
                    if timeout <= 0:
                        _LOGGER.debug("Reset repeat to none")
                        state = self.STATE_NONE
                        timeout = None

                continue

            _LOGGER.warning("Unknown data from queue!")