File: importer_service_worker_test.py

package info (click to toggle)
git-ubuntu 1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,688 kB
  • sloc: python: 13,378; sh: 480; makefile: 2
file content (44 lines) | stat: -rw-r--r-- 1,240 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from unittest.mock import Mock, sentinel

import pytest
import zmq

import gitubuntu.importer_service_worker as target


def test_zmq_recv_with_timeout():
    """If data is available it should get returned"""
    poller = Mock(
        spec=zmq.Poller,
        poll=Mock(return_value=True),
    )
    socket = Mock(
        spec=zmq.Socket,
        recv=Mock(return_value=sentinel.result),
    )
    result = target.zmq_recv_with_timeout(poller, socket, None)
    assert result is sentinel.result


def test_zmq_recv_with_timeout_timeout():
    """If data is not available after timeout an exception should get raised"""
    poller = Mock(
        spec=zmq.Poller,
        poll=Mock(return_value=False),
    )
    with pytest.raises(target.BrokerTimeout):
        target.zmq_recv_with_timeout(poller, None, None)


def test_zmq_recv_with_timeout_multiple():
    """If multiple polls are required the whole frame should be returned"""
    poller = Mock(
        spec=zmq.Poller,
        poll=Mock(return_value=True),
    )
    socket = Mock(
        spec=zmq.Socket,
        recv=Mock(side_effect=[zmq.error.Again, sentinel.result]),
    )
    result = target.zmq_recv_with_timeout(poller, socket, None)
    assert result is sentinel.result