File: test_thread.py

package info (click to toggle)
python-requests-toolbelt 1.0.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 876 kB
  • sloc: python: 3,653; makefile: 166; sh: 7
file content (137 lines) | stat: -rw-r--r-- 4,904 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Module containing the tests for requests_toolbelt.threaded.thread."""
try:
    import queue  # Python 3
except ImportError:
    import Queue as queue
import threading
import unittest
import uuid

try:
    from unittest import mock
except ImportError:
    import mock
import requests.exceptions

from requests_toolbelt.threaded import thread


def _make_mocks():
    return (mock.MagicMock() for _ in range(4))


def _initialize_a_session_thread(session=None, job_queue=None,
                                 response_queue=None, exception_queue=None):
    if job_queue is None:
        job_queue = queue.Queue()
    with mock.patch.object(threading, 'Thread') as Thread:
        thread_instance = mock.MagicMock()
        Thread.return_value = thread_instance
        st = thread.SessionThread(
            initialized_session=session,
            job_queue=job_queue,
            response_queue=response_queue,
            exception_queue=exception_queue,
            )

    return (st, thread_instance, Thread)


class TestSessionThread(unittest.TestCase):

    """Tests for requests_toolbelt.threaded.thread.SessionThread."""

    def test_thread_initialization(self):
        """Test the way a SessionThread is initialized.

        We want to ensure that we creat a thread with a name generated by the
        uuid module, and that we pass the right method to use as a target.
        """
        with mock.patch.object(uuid, 'uuid4', return_value='test'):
            (st, thread_instance, Thread) = _initialize_a_session_thread()

        Thread.assert_called_once_with(target=st._make_request, name='test')
        assert thread_instance.daemon is True
        assert thread_instance._state is 0
        thread_instance.start.assert_called_once_with()

    def test_is_alive_proxies_to_worker(self):
        """Test that we proxy the is_alive method to the Thread."""
        job_queue = queue.Queue()
        with mock.patch.object(threading, 'Thread') as Thread:
            thread_instance = mock.MagicMock()
            Thread.return_value = thread_instance
            st = thread.SessionThread(None, job_queue, None, None)

        st.is_alive()
        thread_instance.is_alive.assert_called_once_with()

    def test_join_proxies_to_worker(self):
        """Test that we proxy the join method to the Thread."""
        st, thread_instance, _ = _initialize_a_session_thread()

        st.join()
        thread_instance.join.assert_called_once_with()

    def test_handle_valid_request(self):
        """Test that a response is added to the right queue."""
        session, job_queue, response_queue, exception_queue = _make_mocks()
        response = mock.MagicMock()
        session.request.return_value = response

        st, _, _ = _initialize_a_session_thread(
            session, job_queue, response_queue, exception_queue)

        st._handle_request({'method': 'GET', 'url': 'http://example.com'})
        session.request.assert_called_once_with(
            method='GET',
            url='http://example.com'
        )

        response_queue.put.assert_called_once_with(
            ({'method': 'GET', 'url': 'http://example.com'}, response)
        )
        assert exception_queue.put.called is False
        assert job_queue.get.called is False
        assert job_queue.get_nowait.called is False
        assert job_queue.get_nowait.called is False
        assert job_queue.task_done.called is True

    def test_handle_invalid_request(self):
        """Test that exceptions from requests are added to the right queue."""
        session, job_queue, response_queue, exception_queue = _make_mocks()
        exception = requests.exceptions.InvalidURL()

        def _side_effect(*args, **kwargs):
            raise exception

        # Make the request raise an exception
        session.request.side_effect = _side_effect

        st, _, _ = _initialize_a_session_thread(
            session, job_queue, response_queue, exception_queue)

        st._handle_request({'method': 'GET', 'url': 'http://example.com'})
        session.request.assert_called_once_with(
            method='GET',
            url='http://example.com'
        )

        exception_queue.put.assert_called_once_with(
            ({'method': 'GET', 'url': 'http://example.com'}, exception)
        )
        assert response_queue.put.called is False
        assert job_queue.get.called is False
        assert job_queue.get_nowait.called is False
        assert job_queue.get_nowait.called is False
        assert job_queue.task_done.called is True

    def test_make_request(self):
        """Test that _make_request exits when the queue is Empty."""
        job_queue = next(_make_mocks())
        job_queue.get_nowait.side_effect = queue.Empty()

        st, _, _ = _initialize_a_session_thread(job_queue=job_queue)
        st._make_request()

        job_queue.get_nowait.assert_called_once_with()