File: repeat.py

package info (click to toggle)
python-rq 2.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,580 kB
  • sloc: python: 13,878; makefile: 22; sh: 19
file content (116 lines) | stat: -rw-r--r-- 4,303 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Optional, Union

if TYPE_CHECKING:
    from redis.client import Pipeline

    from .job import Job
    from .queue import Queue


@dataclass
class Repeat:
    """Defines repeat behavior for scheduled jobs.

    Attributes:
        times (int): The number of times to repeat the job. Must be greater than 0.
        intervals (Union[int, List[int]]): The intervals between job executions in seconds.
            Can be a single integer value or a list of intervals. If a list is provided and it's
            shorter than (times-1), the last interval will be reused for remaining repeats.
    """

    times: int
    intervals: list[int]

    def __init__(self, times: int, interval: Optional[Union[int, Iterable[int]]] = 0):
        """Initialize a Repeat instance.

        Args:
            times (int): The number of times to repeat the job. Must be greater than 0.
            interval (Optional[Union[int, Iterable[int]]], optional): The intervals between job executions in seconds.
                Can be a single integer value or a list of intervals. Defaults to 0 (immediately repeated).

        Raises:
            ValueError: If times is less than 1 or if intervals contains negative values.
        """
        if times < 1:
            raise ValueError('times: please enter a value greater than 0')

        if isinstance(interval, int):
            if interval < 0:
                raise ValueError('intervals: negative numbers are not allowed')
            self.intervals = [interval]
        elif isinstance(interval, Iterable):
            interval_list = list(interval)
            for i in interval_list:
                if i < 0:
                    raise ValueError('intervals: negative numbers are not allowed')
            self.intervals = interval_list
        else:
            raise TypeError('intervals must be an int or iterable of ints')

        self.times = times

    @classmethod
    def get_interval(cls, count: int, intervals: list[int]) -> int:
        """Returns the appropriate interval based on the repeat count.

        Args:
            count (int): Current repeat count (0-based)
            intervals (List[int]): List of intervals

        Returns:
            int: The interval to use
        """

        if count >= len(intervals):
            return intervals[-1]  # Use the last interval if we've run out

        return intervals[count]

    @classmethod
    def schedule(cls, job: 'Job', queue: 'Queue', pipeline: Optional['Pipeline'] = None):
        """Schedules a job to repeat based on its repeat configuration.

        This decrements the job's repeats_left counter and either enqueues
        it immediately (if interval is 0) or schedules it to run after the
        specified interval.

        Args:
            job (Job): The job to repeat
            queue (Queue): The queue to enqueue/schedule the job on
            pipeline (Optional[Pipeline], optional): Redis pipeline to use. Defaults to None.

        Returns:
            scheduled_time (Optional[datetime]): When the job was scheduled to run, or None if not scheduled
        """

        if job.repeats_left is None or job.repeats_left <= 0:
            raise ValueError(f'Cannot schedule job {job.id}: no repeats left')

        pipe = pipeline if pipeline is not None else job.connection.pipeline()

        # Get the interval for this repeat based on remaining repeats
        repeat_count = job.repeats_left - 1  # Count from the end (0-indexed)
        interval = 0

        if job.repeat_intervals:
            interval = cls.get_interval(repeat_count, job.repeat_intervals)

        # Decrement repeats_left
        job.repeats_left = job.repeats_left - 1
        job.save(pipeline=pipe)

        if interval == 0:
            # Enqueue the job immediately
            queue._enqueue_job(job, pipeline=pipe)
        else:
            # Schedule the job to run after the interval
            scheduled_time = datetime.now() + timedelta(seconds=interval)
            queue.schedule_job(job, scheduled_time, pipeline=pipe)

        # Execute the pipeline if we created it
        if pipeline is None:
            pipe.execute()