File: __init__.py

package info (click to toggle)
sentry-python 2.18.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,004 kB
  • sloc: python: 55,908; makefile: 114; sh: 111; xml: 2
file content (58 lines) | stat: -rw-r--r-- 1,499 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
import signal
import tempfile
import threading
import time

from celery.beat import Scheduler

from sentry_sdk.utils import logger


class ImmediateScheduler(Scheduler):
    """
    A custom scheduler that starts tasks immediately after starting Celery beat.
    """

    def setup_schedule(self):
        super().setup_schedule()
        for _, entry in self.schedule.items():
            self.apply_entry(entry)

    def tick(self):
        # Override tick to prevent the normal schedule cycle
        return 1


def kill_beat(beat_pid_file, delay_seconds=1):
    """
    Terminates Celery Beat after the given `delay_seconds`.
    """
    logger.info("Starting Celery Beat killer...")
    time.sleep(delay_seconds)
    pid = int(open(beat_pid_file, "r").read())
    logger.info("Terminating Celery Beat...")
    os.kill(pid, signal.SIGTERM)


def run_beat(celery_app, runtime_seconds=1, loglevel="warning", quiet=True):
    """
    Run Celery Beat that immediately starts tasks.
    The Celery Beat instance is automatically terminated after `runtime_seconds`.
    """
    logger.info("Starting Celery Beat...")
    pid_file = os.path.join(tempfile.mkdtemp(), f"celery-beat-{os.getpid()}.pid")

    t = threading.Thread(
        target=kill_beat,
        args=(pid_file,),
        kwargs={"delay_seconds": runtime_seconds},
    )
    t.start()

    beat_instance = celery_app.Beat(
        loglevel=loglevel,
        quiet=quiet,
        pidfile=pid_file,
    )
    beat_instance.run()