1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
|
import threading
from time import sleep, time
from typing import Callable, Dict, List, Optional
from PIL import Image
from StreamDeck.Devices.StreamDeck import StreamDeck
from StreamDeck.Devices.StreamDeckOriginal import StreamDeckOriginal
from StreamDeck.ImageHelpers import PILHelper
from StreamDeck.Transport.Transport import TransportError
from streamdeck_ui.display.empty_filter import EmptyFilter
from streamdeck_ui.display.filter import Filter
from streamdeck_ui.display.keypress_filter import KeypressFilter
from streamdeck_ui.display.pipeline import Pipeline
class DisplayGrid:
"""
A DisplayGrid is made up of a collection of pipelines, each processing
filters for one individual button display.
"""
_empty_filter: EmptyFilter = EmptyFilter()
"Static instance of EmptyFilter shared by all pipelines"
def __init__(self, lock: threading.Lock, streamdeck: StreamDeck, pages: int, cpu_callback: Callable[[str, int], None], fps: int = 25):
"""Creates a new display instance
:param lock: A lock object that will be used to get exclusive access while enumerating
Stream Decks. This lock must be shared by any object that will read or write to the
Stream Deck.
:type lock: threading.Lock
:param streamdeck: The StreamDeck instance associated with this display
:type streamdeck: StreamDeck
:param pages: The number of logical pages (screen sets)
:type pages: int
:param cpu_callback: A function to call whenever the CPU changes
:type cpu_callback: Callable[[str, int], None]
:param fps: The desired FPS, defaults to 25
:type fps: int, optional
"""
self.streamdeck = streamdeck
# Reference to the actual device, used to update icons
if streamdeck.is_visual():
self.size = streamdeck.key_image_format()["size"]
else:
self.size = (StreamDeckOriginal.KEY_PIXEL_WIDTH, StreamDeckOriginal.KEY_PIXEL_HEIGHT)
# Default to original stream deck size - even though we're not actually going to display anything
self.serial_number = streamdeck.get_serial_number()
self.pages: Dict[int, Dict[int, Pipeline]] = {}
# A dictionary of lists of pipelines. Each page has
# a list, corresponding to each button.
# Initialize with a pipeline per key for all pages
for page in range(pages):
self.pages[page] = {}
for button in range(self.streamdeck.key_count()):
self.pages[page][button] = Pipeline()
self.current_page: int = -1
self.pipeline_thread: Optional[threading.Thread] = None
self.quit = threading.Event()
self.fps = fps
# Configure the maximum frame rate we want to achieve
self.time_per_frame = 1 / fps
self.lock = lock
self.sync = threading.Event()
self.cpu_callback = cpu_callback
# The sync event allows a caller to wait until all the buttons have been processed
DisplayGrid._empty_filter.initialize(self.size)
def replace(self, page: int, button: int, filters: List[Filter]):
with self.lock:
pipeline = Pipeline()
pipeline.add(DisplayGrid._empty_filter)
for filter in filters:
filter.initialize(self.size)
pipeline.add(filter)
keypress = KeypressFilter()
keypress.initialize(self.size)
pipeline.add(keypress)
self.pages[page][button] = pipeline
def get_image(self, page: int, button: int) -> Image.Image:
with self.lock:
# REVIEW: Consider returning not the last result, but a thumbnail
# or something that represents the current "static" look of
# a button. This will need to be added to the interface
# of a filter.
return self.pages[page][button].last_result()
def set_keypress(self, button: int, active: bool):
with self.lock:
for filter in self.pages[self.current_page][button].filters:
if isinstance(filter[0], KeypressFilter):
filter[0].active = active
def synchronize(self):
# Wait until the next cycle is complete.
# To *guarantee* that you have one complete pass, two waits are needed.
# The first gets you to the end of one cycle (you could have called it
# mid cycle). The second gets you one pass through. Worst case, you
# do two full cycles. Best case, you do 1 full and one partial.
self.sync.wait()
self.sync.wait()
def _run(self):
"""Method that runs on background thread and updates the pipelines."""
frames = 0
start = time()
last_page = -1
execution_time = 0
frame_cache = {}
while not self.quit.isSet():
current_time = time()
with self.lock:
page = self.pages[self.current_page]
force_update = False
if last_page != page:
# When a page switch happen, force the pipelines to redraw so icons update
force_update = True
last_page = page
pipeline_cache_count = 0
for button, pipeline in page.items():
# Process all the steps in the pipeline and return the resulting image
with self.lock:
image, hashcode = pipeline.execute(current_time)
pipeline_cache_count += len(pipeline.output_cache)
# If none of the filters in the pipeline yielded a change, use
# the last known result
if force_update and image is None:
image = pipeline.last_result()
if image:
# We cannot afford to do this conversion on every final frame.
# Since we want the flexibilty of a pipeline engine that can mutate the
# images along a chain of filters, the outcome can be somewhat unpredicatable
# For example - a clock that changes time or an animation that changes
# the frame and font that overlays. In many instances there is a finite
# number of frames per pipeline (a looping GIF with image, a pulsing icon etc)
# Some may also be virtually have infinite mutations. A cache per pipeline
# with an eviction policy of the oldest would likely suffice.
# The main problem is since the pipeline can mutate it's too expensive to
# calculate the actual hash of the final frame.
# Create a hash function that the filter itself defines. It has to
# update the hashcode with the unique attributes of the input it requires
# to make the frame. This could be time, text, frame number etc.
# The hash can then be passed to the next step and XOR'd or combined
# with the next hash. This yields a final hash code that can then be
# used to cache the output. At the end of the pipeline the hash can
# be checked and final bytes will be ready to pipe to the device.
# FIXME: This will be unbounded, old frames will need to be evicted
if hashcode not in frame_cache:
image = PILHelper.to_native_format(self.streamdeck, image)
frame_cache[hashcode] = image
else:
image = frame_cache[hashcode]
if self.streamdeck.is_visual():
try:
with self.lock:
self.streamdeck.set_key_image(button, image)
except TransportError:
# Review - deadlock if you wait on yourself?
self.stop()
pass
return
self.sync.set()
self.sync.clear()
# Calculate how long we took to process the pipeline
elapsed_time = time() - current_time
execution_time += elapsed_time
# Calculate how much we have to sleep between processing cycles to maintain the desired FPS
# If we have less than 5ms left, don't bother sleeping, as the context switch and
# overhead of sleeping/waking up is consumed
time_left = self.time_per_frame - elapsed_time
if time_left > 0.005:
sleep(time_left)
frames += 1
if time() - start > 1.0:
execution_time_ms = int(execution_time * 1000)
if self.cpu_callback:
self.cpu_callback(self.serial_number, int(execution_time_ms / 1000 * 100))
# execution_time_ms = int(execution_time * 1000)
# print(f"FPS: {frames} Execution time: {execution_time_ms} ms Execution %: {int(execution_time_ms/1000 * 100)}")
# print(f"Output cache size: {len(frame_cache)}")
# print(f"Pipeline cache size: {pipeline_cache_count}")
execution_time = 0
frames = 0
start = time()
def set_page(self, page: int):
"""Switches to the given page. Pipelines for that page starts running,
other page pipelines stop.
Args:
page (int): The page number to switch to.
"""
with self.lock:
if self.current_page >= 0:
# Ensure none of the button filters are active anymore
old_page = self.pages[self.current_page]
for _, pipeline in old_page.items():
for filter in pipeline.filters:
if isinstance(filter[0], KeypressFilter):
filter[0].active = False
# REVIEW: We could detect the active key on the last page, and make it active
# on the target page
self.current_page = page
def start(self):
if self.pipeline_thread is not None:
self.quit.set()
try:
self.pipeline_thread.join()
except RuntimeError:
pass
self.quit.clear()
self.pipeline_thread = threading.Thread(target=self._run)
self.pipeline_thread.daemon = True
self.pipeline_thread.start()
self.synchronize()
# Wait for first frames to become ready
def stop(self):
if self.pipeline_thread is not None:
self.quit.set()
try:
self.pipeline_thread.join()
except RuntimeError:
pass
self.pipeline_thread = None
|