1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
"""Module containing the tests for requests_toolbelt.threaded.thread."""
try:
import queue # Python 3
except ImportError:
import Queue as queue
import threading
import unittest
import uuid
try:
from unittest import mock
except ImportError:
import mock
import requests.exceptions
from requests_toolbelt.threaded import thread
def _make_mocks():
return (mock.MagicMock() for _ in range(4))
def _initialize_a_session_thread(session=None, job_queue=None,
response_queue=None, exception_queue=None):
if job_queue is None:
job_queue = queue.Queue()
with mock.patch.object(threading, 'Thread') as Thread:
thread_instance = mock.MagicMock()
Thread.return_value = thread_instance
st = thread.SessionThread(
initialized_session=session,
job_queue=job_queue,
response_queue=response_queue,
exception_queue=exception_queue,
)
return (st, thread_instance, Thread)
class TestSessionThread(unittest.TestCase):
"""Tests for requests_toolbelt.threaded.thread.SessionThread."""
def test_thread_initialization(self):
"""Test the way a SessionThread is initialized.
We want to ensure that we creat a thread with a name generated by the
uuid module, and that we pass the right method to use as a target.
"""
with mock.patch.object(uuid, 'uuid4', return_value='test'):
(st, thread_instance, Thread) = _initialize_a_session_thread()
Thread.assert_called_once_with(target=st._make_request, name='test')
assert thread_instance.daemon is True
assert thread_instance._state is 0
thread_instance.start.assert_called_once_with()
def test_is_alive_proxies_to_worker(self):
"""Test that we proxy the is_alive method to the Thread."""
job_queue = queue.Queue()
with mock.patch.object(threading, 'Thread') as Thread:
thread_instance = mock.MagicMock()
Thread.return_value = thread_instance
st = thread.SessionThread(None, job_queue, None, None)
st.is_alive()
thread_instance.is_alive.assert_called_once_with()
def test_join_proxies_to_worker(self):
"""Test that we proxy the join method to the Thread."""
st, thread_instance, _ = _initialize_a_session_thread()
st.join()
thread_instance.join.assert_called_once_with()
def test_handle_valid_request(self):
"""Test that a response is added to the right queue."""
session, job_queue, response_queue, exception_queue = _make_mocks()
response = mock.MagicMock()
session.request.return_value = response
st, _, _ = _initialize_a_session_thread(
session, job_queue, response_queue, exception_queue)
st._handle_request({'method': 'GET', 'url': 'http://example.com'})
session.request.assert_called_once_with(
method='GET',
url='http://example.com'
)
response_queue.put.assert_called_once_with(
({'method': 'GET', 'url': 'http://example.com'}, response)
)
assert exception_queue.put.called is False
assert job_queue.get.called is False
assert job_queue.get_nowait.called is False
assert job_queue.get_nowait.called is False
assert job_queue.task_done.called is True
def test_handle_invalid_request(self):
"""Test that exceptions from requests are added to the right queue."""
session, job_queue, response_queue, exception_queue = _make_mocks()
exception = requests.exceptions.InvalidURL()
def _side_effect(*args, **kwargs):
raise exception
# Make the request raise an exception
session.request.side_effect = _side_effect
st, _, _ = _initialize_a_session_thread(
session, job_queue, response_queue, exception_queue)
st._handle_request({'method': 'GET', 'url': 'http://example.com'})
session.request.assert_called_once_with(
method='GET',
url='http://example.com'
)
exception_queue.put.assert_called_once_with(
({'method': 'GET', 'url': 'http://example.com'}, exception)
)
assert response_queue.put.called is False
assert job_queue.get.called is False
assert job_queue.get_nowait.called is False
assert job_queue.get_nowait.called is False
assert job_queue.task_done.called is True
def test_make_request(self):
"""Test that _make_request exits when the queue is Empty."""
job_queue = next(_make_mocks())
job_queue.get_nowait.side_effect = queue.Empty()
st, _, _ = _initialize_a_session_thread(job_queue=job_queue)
st._make_request()
job_queue.get_nowait.assert_called_once_with()
|