1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
import mock
import pytest
from cloudprint import cloudprint
@pytest.fixture
def xmpp_conn(monkeypatch):
XmppConnection = mock.Mock(name='XmppConnection')
monkeypatch.setattr('cloudprint.xmpp.XmppConnection', XmppConnection)
return XmppConnection.return_value
def test_print(requests, cups, cpp, xmpp_conn):
cloudprint.num_retries = 0
printer = cpp.test_add_printer('printer')
printer.get_jobs.return_value = [{
'fileUrl': 'http://print_job.pdf',
'ticketUrl': 'http://ticket',
'title': '*' * 300,
'id': 'job_1',
}]
xmpp_conn.is_connected.return_value = True
requests.get('http://print_job.pdf', text='This is a PDF')
requests.get(
'http://ticket',
json={
'request': '',
'a': 1,
'b': 2,
},
)
cloudprint.process_jobs_once(cups, cpp, xmpp_conn)
cups.printFile.assert_called_with(
'printer',
mock.ANY,
'*' * 255,
{
'a': '1',
'b': '2',
},
)
cpp.finish_job.assert_called_with('job_1')
assert xmpp_conn.await_notification.called
assert cloudprint.num_retries == 0
def test_retry(requests, cups, cpp, xmpp_conn):
cloudprint.num_retries = 0
printer = cpp.test_add_printer('printer')
printer.get_jobs.return_value = [{
'fileUrl': 'http://print_job.pdf',
'ticketUrl': 'http://ticket',
'title': '*' * 300,
'id': 'job_1',
}]
requests.get(url='http://print_job.pdf', status_code=500)
cloudprint.process_jobs_once(cups, cpp, xmpp_conn)
assert cloudprint.num_retries == 1
def test_failed(requests, cups, cpp, xmpp_conn):
cloudprint.num_retries = 0
printer = cpp.test_add_printer('printer')
printer.get_jobs.return_value = [{
'fileUrl': 'http://print_job.pdf',
'ticketUrl': 'http://ticket',
'title': '*' * 300,
'id': 'job_1',
}]
requests.get(url='http://print_job.pdf', status_code=500)
for _ in range(cloudprint.RETRIES + 1):
cloudprint.process_jobs_once(cups, cpp, xmpp_conn)
cpp.fail_job.assert_called_with('job_1')
assert cloudprint.num_retries == 0
|