File: decorators.py

package info (click to toggle)
python-rq 2.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,580 kB
  • sloc: python: 13,878; makefile: 22; sh: 19
file content (124 lines) | stat: -rw-r--r-- 5,065 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

if TYPE_CHECKING:
    from redis import Redis

    from .job import Retry

from .defaults import DEFAULT_RESULT_TTL
from .job import Callback
from .queue import Queue


class job:  # noqa
    queue_class = Queue

    def __init__(
        self,
        queue: Union['Queue', str],
        connection: 'Redis',
        timeout: Optional[int] = None,
        result_ttl: int = DEFAULT_RESULT_TTL,
        ttl: Optional[int] = None,
        queue_class: Optional[type['Queue']] = None,
        depends_on: Optional[list[Any]] = None,
        at_front: bool = False,
        meta: Optional[dict[Any, Any]] = None,
        description: Optional[str] = None,
        failure_ttl: Optional[int] = None,
        retry: Optional['Retry'] = None,
        on_failure: Optional[Union[Callback, Callable[..., Any]]] = None,
        on_success: Optional[Union[Callback, Callable[..., Any]]] = None,
        on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None,
    ):
        """A decorator that adds a ``enqueue`` method to the decorated function,
        which in turn creates a RQ job when called. Accepts a required
        ``queue`` argument that can be either a ``Queue`` instance or a string
        denoting the queue name.  For example::

            ..codeblock:python::

                >>> @job(queue='default')
                >>> def simple_add(x, y):
                >>>    return x + y
                >>> ...
                >>> # Puts `simple_add` function into queue
                >>> simple_add.enqueue(1, 2)

        Args:
            queue (Union['Queue', str]): The queue to use, can be the Queue class itself, or the queue name (str)
            connection (Optional[Redis], optional): Redis Connection. Defaults to None.
            timeout (Optional[int], optional): Job timeout. Defaults to None.
            result_ttl (int, optional): Result time to live. Defaults to DEFAULT_RESULT_TTL.
            ttl (Optional[int], optional): Time to live. Defaults to None.
            queue_class (Optional[Queue], optional): A custom class that inherits from `Queue`. Defaults to None.
            depends_on (Optional[List[Any]], optional): A list of dependents jobs. Defaults to None.
            at_front (Optional[bool], optional): Whether to enqueue the job at front of the queue. Defaults to None.
            meta (Optional[Dict[Any, Any]], optional): Arbitrary metadata about the job. Defaults to None.
            description (Optional[str], optional): Job description. Defaults to None.
            failure_ttl (Optional[int], optional): Failure time to live. Defaults to None.
            retry (Optional[Retry], optional): A Retry object. Defaults to None.
            on_failure (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run on failure. Defaults
                to None.
            on_success (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run on success. Defaults
                to None.
            on_stopped (Optional[Union[Callback, Callable[..., Any]]], optional): Callable to run when stopped. Defaults
                to None.
        """
        self.queue = queue
        self.queue_class = queue_class if queue_class else Queue
        self.connection = connection
        self.timeout = timeout
        self.result_ttl = result_ttl
        self.ttl = ttl
        self.meta = meta
        self.depends_on = depends_on
        self.at_front = at_front
        self.description = description
        self.failure_ttl = failure_ttl
        self.retry = retry
        self.on_success = on_success
        self.on_failure = on_failure
        self.on_stopped = on_stopped

    def __call__(self, f):
        @wraps(f)
        def delay(*args, **kwargs):
            if isinstance(self.queue, str):
                queue = self.queue_class(name=self.queue, connection=self.connection)
            else:
                queue = self.queue

            depends_on = kwargs.pop('depends_on', None)
            job_id = kwargs.pop('job_id', None)
            at_front = kwargs.pop('at_front', False)

            if not depends_on:
                depends_on = self.depends_on

            if not at_front:
                at_front = self.at_front

            return queue.enqueue_call(
                f,
                args=args,
                kwargs=kwargs,
                timeout=self.timeout,
                result_ttl=self.result_ttl,
                ttl=self.ttl,
                depends_on=depends_on,
                job_id=job_id,
                at_front=at_front,
                meta=self.meta,
                description=self.description,
                failure_ttl=self.failure_ttl,
                retry=self.retry,
                on_failure=self.on_failure,
                on_success=self.on_success,
                on_stopped=self.on_stopped,
            )

        f.delay = delay  # TODO: Remove this in 3.0
        f.enqueue = delay
        return f