File: GLibMainLoopThread.py

package info (click to toggle)
plasma-workspace 4%3A6.5.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 99,452 kB
  • sloc: cpp: 125,486; python: 4,246; xml: 2,449; perl: 572; sh: 230; javascript: 75; ruby: 39; ansic: 13; makefile: 9
file content (57 lines) | stat: -rw-r--r-- 1,474 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2023 Fushan Wen <qydwhotmail@gmail.com>
# SPDX-License-Identifier: GPL-2.0-or-later

# For FreeBSD CI which only has python 3.9
from __future__ import annotations

import threading
import time

from gi.repository import GLib


class GLibMainLoopThread(threading.Thread):
    """
    A GLib main loop in another thread.
    @note It's redundant to create a loop when QCoreApplication is used in the same application.
    """

    loop: GLib.MainLoop | None = None

    def __init__(self) -> None:
        # Set up D-Bus loop
        self.loop = GLib.MainLoop()
        self.failSafeTimer = threading.Timer(60, self.loop.quit)

        # Create the thread
        super(GLibMainLoopThread, self).__init__()

    def run(self) -> None:
        """
        Method to run the DBus main loop (on a thread)
        """
        self.failSafeTimer.start()
        self.loop.run()

    def quit(self) -> None:
        self.failSafeTimer.cancel()
        self.loop.quit()

    @classmethod
    def process_events(cls) -> None:
        """
        Processes some pending events in the main loop
        """
        if cls.loop is not None:
            context = cls.loop.get_context()
        else:
            context = GLib.MainContext.default()

        for _ in range(4):
            if not context.pending():
                time.sleep(0.1)
                continue
            context.iteration(may_block=False)
            break