1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
#!/usr/bin/env python
"""
Backend to support Brother QL-series printers via network.
Works cross-platform.
"""
from __future__ import unicode_literals
from builtins import str
import socket, os, time, select
from .generic import BrotherQLBackendGeneric
def list_available_devices():
"""
List all available devices for the network backend
returns: devices: a list of dictionaries with the keys 'identifier' and 'instance': \
[ {'identifier': 'tcp://hostname[:port]', 'instance': None}, ] \
Instance is set to None because we don't want to connect to the device here yet.
"""
# We need some snmp request sent to 255.255.255.255 here
raise NotImplementedError()
return [{'identifier': 'tcp://' + path, 'instance': None} for path in paths]
class BrotherQLBackendNetwork(BrotherQLBackendGeneric):
"""
BrotherQL backend using the Linux Kernel USB Printer Device Handles
"""
def __init__(self, device_specifier):
"""
device_specifier: string or os.open(): identifier in the \
format file:///dev/usb/lp0 or os.open() raw device handle.
"""
self.read_timeout = 0.01
# strategy : try_twice, select or socket_timeout
self.strategy = 'socket_timeout'
if isinstance(device_specifier, str):
if device_specifier.startswith('tcp://'):
device_specifier = device_specifier[6:]
host, _, port = device_specifier.partition(':')
if port:
port = int(port)
else:
port = 9100
#try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.s.connect((host, port))
#except OSError as e:
# raise ValueError('Could not connect to the device.')
if self.strategy == 'socket_timeout':
self.s.settimeout(self.read_timeout)
elif self.strategy == 'try_twice':
self.s.settimeout(self.read_timeout)
else:
self.s.settimeout(0)
elif isinstance(device_specifier, int):
self.dev = device_specifier
else:
raise NotImplementedError('Currently the printer can be specified either via an appropriate string or via an os.open() handle.')
def _write(self, data):
self.s.settimeout(10)
self.s.sendall(data)
self.s.settimeout(self.read_timeout)
def _read(self, length=32):
if self.strategy in ('socket_timeout', 'try_twice'):
if self.strategy == 'socket_timeout':
tries = 1
if self.strategy == 'try_twice':
tries = 2
for i in range(tries):
try:
data = self.s.recv(length)
return data
except socket.timeout:
pass
return b''
elif self.strategy == 'select':
data = b''
start = time.time()
while (not data) and (time.time() - start < self.read_timeout):
result, _, _ = select.select([self.s], [], [], 0)
if self.s in result:
data += self.s.recv(length)
if data: break
time.sleep(0.001)
return data
else:
raise NotImplementedError('Unknown strategy')
def _dispose(self):
self.s.shutdown(socket.SHUT_RDWR)
self.s.close()
|