File: test_retry_check_runner.py

package info (click to toggle)
python-globus-sdk 4.3.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,144 kB
  • sloc: python: 35,242; sh: 37; makefile: 35
file content (48 lines) | stat: -rw-r--r-- 1,590 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from unittest import mock

from globus_sdk.transport import (
    RequestCallerInfo,
    RetryCheckResult,
    RetryCheckRunner,
    RetryConfig,
    RetryContext,
)
from globus_sdk.transport.default_retry_checks import DEFAULT_RETRY_CHECKS


def _make_test_retry_context(*, status=200, exception=None, response=None):
    retry_config = RetryConfig()
    retry_config.checks.register_many_checks(DEFAULT_RETRY_CHECKS)
    caller_info = RequestCallerInfo(retry_config=retry_config)
    if exception:
        return RetryContext(1, caller_info=caller_info, exception=exception)
    elif response:
        return RetryContext(1, caller_info=caller_info, response=response)

    dummy_response = mock.Mock()
    dummy_response.status_code = 200
    return RetryContext(1, caller_info=caller_info, response=dummy_response)


def test_retry_check_runner_should_retry_explicit_on_first_check():
    def check1(ctx):
        return RetryCheckResult.do_not_retry

    def check2(ctx):
        return RetryCheckResult.do_retry

    failing_checker = RetryCheckRunner([check1, check2])
    assert failing_checker.should_retry(_make_test_retry_context()) is False
    passing_checker = RetryCheckRunner([check2, check1])
    assert passing_checker.should_retry(_make_test_retry_context()) is True


def test_retry_check_runner_fallthrough_to_false():
    def check1(ctx):
        return RetryCheckResult.no_decision

    def check2(ctx):
        return RetryCheckResult.no_decision

    checker = RetryCheckRunner([check1, check2])
    assert checker.should_retry(_make_test_retry_context()) is False