File: test_robust.py

package info (click to toggle)
python-multiurl 0.3.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 168 kB
  • sloc: python: 1,318; makefile: 4
file content (65 lines) | stat: -rw-r--r-- 1,519 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# (C) Copyright 2021 ECMWF.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.
#

import logging
import os
import random
import threading
from contextlib import contextmanager

import pytest

from multiurl import download
from multiurl.http import RETRIABLE


def handler(signum, frame):
    raise TimeoutError()


@contextmanager
def timeout(s):
    def killer():
        os._exit(1)

    save = threading.Timer(s, killer)
    save.start()
    try:
        yield
    finally:
        save.cancel()


def test_robust():
    sleep = 5
    with timeout(len(RETRIABLE * sleep * 10)):
        code = random.choice(RETRIABLE)
        download(
            f"http://httpbin.org/status/200,{code}",
            retry_after=sleep,
            target="test.data",
        )


@pytest.mark.skipif(True, reason="Mirror disabled")
def test_mirror():
    download(
        "http://datastore.copernicus-climate.eu/error/test-data/metview/gallery/temp.bufr",
        mirrors={
            "http://datastore.copernicus-climate.eu/error/": [
                "http://download.ecmwf.int/"
            ]
        },
        target="data.bufr",
    )


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    test_mirror()