File: shared_dict.py

package info (click to toggle)
input-remapper 2.1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 2,856 kB
  • sloc: python: 27,277; sh: 191; xml: 33; makefile: 3
file content (122 lines) | stat: -rw-r--r-- 3,864 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# -*- coding: utf-8 -*-
# input-remapper - GUI for device specific keyboard mappings
# Copyright (C) 2025 sezanzeb <b8x45ygc9@mozmail.com>
#
# This file is part of input-remapper.
#
# input-remapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# input-remapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with input-remapper.  If not, see <https://www.gnu.org/licenses/>.


"""Share a dictionary across processes."""


import atexit
import multiprocessing
import select
from typing import Optional, Any

from inputremapper.logging.logger import logger


class SharedDict:
    """Share a dictionary across processes."""

    # because unittests terminate all child processes in cleanup I can't use
    # multiprocessing.Manager
    def __init__(self):
        """Create a shared dictionary."""
        super().__init__()

        # To avoid blocking forever if something goes wrong. The maximum
        # observed time communication takes was 0.001 for me on a slow pc
        self._timeout = 0.02

        self.pipe = multiprocessing.Pipe()
        self.process = None
        atexit.register(self._stop)

    def start(self):
        """Ensure the process to manage the dictionary is running."""
        if self.process is not None and self.process.is_alive():
            logger.debug("SharedDict process already running")
            return

        # if the manager has already been running in the past but stopped
        # for some reason, the dictionary contents are lost.
        logger.debug("Starting SharedDict process")
        self.process = multiprocessing.Process(target=self.manage)
        self.process.start()

    def manage(self):
        """Manage the dictionary, handle read and write requests."""
        logger.debug("SharedDict process started")
        shared_dict = {}
        while True:
            message = self.pipe[0].recv()
            logger.debug("SharedDict got %s", message)

            if message[0] == "stop":
                return

            if message[0] == "set":
                shared_dict[message[1]] = message[2]

            if message[0] == "clear":
                shared_dict.clear()

            if message[0] == "get":
                self.pipe[0].send(shared_dict.get(message[1]))

            if message[0] == "ping":
                self.pipe[0].send("pong")

    def _stop(self):
        """Stop the managing process."""
        self.pipe[1].send(("stop",))

    def _clear(self):
        """Clears the memory."""
        self.pipe[1].send(("clear",))

    def get(self, key: str):
        """Get a value from the dictionary.

        If it doesn't exist, returns None.
        """
        return self[key]

    def is_alive(self, timeout: Optional[int] = None):
        """Check if the manager process is running."""
        self.pipe[1].send(("ping",))
        select.select([self.pipe[1]], [], [], timeout or self._timeout)
        if self.pipe[1].poll():
            return self.pipe[1].recv() == "pong"

        return False

    def __setitem__(self, key: str, value: Any):
        self.pipe[1].send(("set", key, value))

    def __getitem__(self, key: str):
        self.pipe[1].send(("get", key))

        select.select([self.pipe[1]], [], [], self._timeout)
        if self.pipe[1].poll():
            return self.pipe[1].recv()

        logger.error("select.select timed out")
        return None

    def __del__(self):
        self._stop()