1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
import lightblue
from .btcommon import *
def discover_devices(duration=8, flush_cache=True, lookup_names=False,
lookup_class=False, device_id=-1):
# This is order of discovered device attributes in C-code.
btAddresIndex = 0
namesIndex = 1
classIndex = 2
# Use lightblue to discover devices on OSX.
devices = lightblue.finddevices(getnames=lookup_names, length=duration)
ret = list()
for device in devices:
item = [device[btAddresIndex], ]
if lookup_names:
item.append(device[namesIndex])
if lookup_class:
item.append(device[classIndex])
# in case of address-only we return string not tuple
if len(item) == 1:
ret.append(item[0])
else:
ret.append(tuple(item))
return ret
def lookup_name(address, timeout=10):
print("TODO: implement")
# TODO: After a little looking around, it seems that we can go into some of the
# lightblue internals and enhance the amount of SDP information that is returned
# (e.g., CID/PSM, protocol, provider information).
#
# See: _searchservices() in _lightblue.py
def find_service(name=None, uuid=None, address=None):
if address is not None:
addresses = [address]
else:
addresses = discover_devices(lookup_names=False)
results = []
for address in addresses:
# print "[DEBUG] Browsing services on %s..." % (addr)
dresults = lightblue.findservices(addr=address, name=name)
for tup in dresults:
service = {}
# LightBlue performs a service discovery and returns the found
# services as a list of (device-address, service-port,
# service-name) tuples.
service["host"] = tup[0]
service["port"] = tup[1]
service["name"] = tup[2]
# Add extra keys for compatibility with PyBluez API.
service["description"] = None
service["provider"] = None
service["protocol"] = None
service["service-classes"] = []
service["profiles"] = []
service["service-id"] = None
results.append(service)
return results
def read_local_bdaddr():
return [lightblue.gethostaddr()]
def advertise_service(sock, name, service_id="", service_classes=None,
profiles=None, provider="", description="", protocols=None):
if protocols is None or protocols == RFCOMM:
protocols = [lightblue.RFCOMM]
lightblue.advertise(name, sock, protocols[0], service_id)
def stop_advertising(sock):
lightblue.stop_advertising(sock)
# ============================= BluetoothSocket ============================== #
class BluetoothSocket:
def __init__(self, proto=RFCOMM, _sock=None):
if _sock is None:
_sock = lightblue.socket()
self._sock = _sock
if proto != RFCOMM:
# name the protocol
raise NotImplementedError("Not supported protocol")
self._proto = lightblue.RFCOMM
self._addrport = None
def _getport(self):
return self._addrport[1]
def bind(self, addrport):
self._addrport = addrport
return self._sock.bind(addrport)
def listen(self, backlog):
return self._sock.listen(backlog)
def accept(self):
return self._sock.accept()
def connect(self, addrport):
return self._sock.connect(addrport)
def send(self, data):
return self._sock.send(data)
def recv(self, numbytes):
return self._sock.recv(numbytes)
def close(self):
return self._sock.close()
def getsockname(self):
return self._sock.getsockname()
def setblocking(self, blocking):
return self._sock.setblocking(blocking)
def settimeout(self, timeout):
return self._sock.settimeout(timeout)
def gettimeout(self):
return self._sock.gettimeout()
def fileno(self):
return self._sock.fileno()
def dup(self):
return BluetoothSocket(self._proto, self._sock)
def makefile(self, mode, bufsize):
return self.makefile(mode, bufsize)
# ============================= DeviceDiscoverer ============================= #
class DeviceDiscoverer:
def __init__ (self):
raise NotImplementedError
|