File: test_process_jobs.py

package info (click to toggle)
cloudprint 0.14-12
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 512 kB
  • sloc: python: 1,128; sh: 48; makefile: 16
file content (88 lines) | stat: -rw-r--r-- 2,214 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import mock
import pytest

from cloudprint import cloudprint


@pytest.fixture
def xmpp_conn(monkeypatch):
    XmppConnection = mock.Mock(name='XmppConnection')
    monkeypatch.setattr('cloudprint.xmpp.XmppConnection', XmppConnection)
    return XmppConnection.return_value


def test_print(requests, cups, cpp, xmpp_conn):
    cloudprint.num_retries = 0

    printer = cpp.test_add_printer('printer')
    printer.get_jobs.return_value = [{
        'fileUrl': 'http://print_job.pdf',
        'ticketUrl': 'http://ticket',
        'title': '*' * 300,
        'id': 'job_1',
    }]

    xmpp_conn.is_connected.return_value = True

    requests.get('http://print_job.pdf', text='This is a PDF')
    requests.get(
        'http://ticket',
        json={
            'request': '',
            'a': 1,
            'b': 2,
        },
    )

    cloudprint.process_jobs_once(cups, cpp, xmpp_conn)

    cups.printFile.assert_called_with(
        'printer',
        mock.ANY,
        '*' * 255,
        {
            'a': '1',
            'b': '2',
        },
    )
    cpp.finish_job.assert_called_with('job_1')
    assert xmpp_conn.await_notification.called
    assert cloudprint.num_retries == 0


def test_retry(requests, cups, cpp, xmpp_conn):
    cloudprint.num_retries = 0

    printer = cpp.test_add_printer('printer')
    printer.get_jobs.return_value = [{
        'fileUrl': 'http://print_job.pdf',
        'ticketUrl': 'http://ticket',
        'title': '*' * 300,
        'id': 'job_1',
    }]

    requests.get(url='http://print_job.pdf', status_code=500)

    cloudprint.process_jobs_once(cups, cpp, xmpp_conn)

    assert cloudprint.num_retries == 1


def test_failed(requests, cups, cpp, xmpp_conn):
    cloudprint.num_retries = 0

    printer = cpp.test_add_printer('printer')
    printer.get_jobs.return_value = [{
        'fileUrl': 'http://print_job.pdf',
        'ticketUrl': 'http://ticket',
        'title': '*' * 300,
        'id': 'job_1',
    }]

    requests.get(url='http://print_job.pdf', status_code=500)

    for _ in range(cloudprint.RETRIES + 1):
        cloudprint.process_jobs_once(cups, cpp, xmpp_conn)

    cpp.fail_job.assert_called_with('job_1')
    assert cloudprint.num_retries == 0