File: network.py

package info (click to toggle)
python-brother-ql 0.9.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 276 kB
  • sloc: python: 1,640; makefile: 4
file content (101 lines) | stat: -rwxr-xr-x 3,563 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python

"""
Backend to support Brother QL-series printers via network.
Works cross-platform.
"""

from __future__ import unicode_literals
from builtins import str

import socket, os, time, select

from .generic import BrotherQLBackendGeneric

def list_available_devices():
    """
    List all available devices for the network backend

    returns: devices: a list of dictionaries with the keys 'identifier' and 'instance': \
        [ {'identifier': 'tcp://hostname[:port]', 'instance': None}, ] \
        Instance is set to None because we don't want to connect to the device here yet.
    """

    # We need some snmp request sent to 255.255.255.255 here
    raise NotImplementedError()
    return [{'identifier': 'tcp://' + path, 'instance': None} for path in paths]

class BrotherQLBackendNetwork(BrotherQLBackendGeneric):
    """
    BrotherQL backend using the Linux Kernel USB Printer Device Handles
    """

    def __init__(self, device_specifier):
        """
        device_specifier: string or os.open(): identifier in the \
            format file:///dev/usb/lp0 or os.open() raw device handle.
        """

        self.read_timeout = 0.01
        # strategy : try_twice, select or socket_timeout
        self.strategy = 'socket_timeout'
        if isinstance(device_specifier, str):
            if device_specifier.startswith('tcp://'):
                device_specifier = device_specifier[6:]
            host, _, port = device_specifier.partition(':')
            if port:
                port = int(port)
            else:
                port = 9100
            #try:
            self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            self.s.connect((host, port))
            #except OSError as e:
            #    raise ValueError('Could not connect to the device.')
            if self.strategy == 'socket_timeout':
                self.s.settimeout(self.read_timeout)
            elif self.strategy == 'try_twice':
                self.s.settimeout(self.read_timeout)
            else:
                self.s.settimeout(0)

        elif isinstance(device_specifier, int):
            self.dev = device_specifier
        else:
            raise NotImplementedError('Currently the printer can be specified either via an appropriate string or via an os.open() handle.')

    def _write(self, data):
        self.s.settimeout(10)
        self.s.sendall(data)
        self.s.settimeout(self.read_timeout)

    def _read(self, length=32):
        if self.strategy in ('socket_timeout', 'try_twice'):
            if self.strategy == 'socket_timeout':
                tries = 1
            if self.strategy == 'try_twice':
                tries = 2
            for i in range(tries):
                try:
                    data = self.s.recv(length)
                    return data
                except socket.timeout:
                    pass
            return b''
        elif self.strategy == 'select':
            data = b''
            start = time.time()
            while (not data) and (time.time() - start < self.read_timeout):
                result, _, _ = select.select([self.s], [], [], 0)
                if self.s in result:
                    data += self.s.recv(length)
                if data: break
                time.sleep(0.001)
            return data
        else:
            raise NotImplementedError('Unknown strategy')

    def _dispose(self):
        self.s.shutdown(socket.SHUT_RDWR)
        self.s.close()