File: mocks.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (42 lines) | stat: -rw-r--r-- 1,494 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from datetime import timedelta

from azure.servicebus._common.utils import utc_now
from azure.servicebus import ServiceBusReceivedMessage


class MockReceiver:
    def __init__(self):
        self._running = True

    def renew_message_lock(self, message):
        if message._exception_on_renew_lock:
            raise Exception("Generated exception via MockReceivedMessage exception_on_renew_lock")
        if not message._prevent_renew_lock:
            message.locked_until_utc = message.locked_until_utc + timedelta(seconds=message._lock_duration)


class MockReceivedMessage(ServiceBusReceivedMessage):
    def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False, **kwargs):
        self._lock_duration = kwargs.get("lock_duration", 2)
        self._raw_amqp_message = None
        self._received_timestamp_utc = utc_now()
        self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration)
        self._settled = False
        self._receiver = MockReceiver()

        self._prevent_renew_lock = prevent_renew_lock
        self._exception_on_renew_lock = exception_on_renew_lock

    @property
    def _lock_expired(self):
        if self.locked_until_utc and self.locked_until_utc <= utc_now():
            return True
        return False

    @property
    def locked_until_utc(self):
        return self._locked_until_utc

    @locked_until_utc.setter
    def locked_until_utc(self, value):
        self._locked_until_utc = value