File: _lightblue.py

package info (click to toggle)
pybluez 0.23-5.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 2,152 kB
  • sloc: ansic: 4,854; python: 4,319; objc: 3,363; cpp: 1,950; makefile: 190
file content (546 lines) | stat: -rwxr-xr-x 19,223 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
# Copyright (c) 2009 Bea Lam. All rights reserved.
#
# This file is part of LightBlue.
#
# LightBlue is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# LightBlue is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with LightBlue.  If not, see <http://www.gnu.org/licenses/>.

# Mac OS X main module implementation.

import types
import warnings

import Foundation
import AppKit
import objc
from objc import super

from . import _IOBluetooth
from . import _LightAquaBlue
from . import _lightbluecommon
from . import _macutil
from . import _bluetoothsockets


# public attributes
__all__ = ("finddevices", "findservices", "finddevicename",
           "selectdevice", "selectservice",
           "gethostaddr", "gethostclass",
           "socket",
           "advertise", "stopadvertise")

# details of advertised services
__advertised = {}


def finddevices(getnames=True, length=10):
    inquiry = _SyncDeviceInquiry()
    inquiry.run(getnames, length)
    devices = inquiry.getfounddevices()
    return devices


def findservices(addr=None, name=None, servicetype=None):
    if servicetype not in (_lightbluecommon.RFCOMM, _lightbluecommon.OBEX, None):
        raise ValueError("servicetype must be RFCOMM, OBEX or None, was %s" % \
            servicetype)

    if addr is None:
        try:
            founddevices = finddevices()
        except _lightbluecommon.BluetoothError as e:
            msg = "findservices() failed, " +\
                    "error while finding devices: " + str(e)
            raise _lightbluecommon.BluetoothError(msg)

        #print founddevices
        addresses = [dev[0] for dev in founddevices]
    else:
        addresses = [addr]

    services = []
    for devaddr in addresses:
        iobtdevice = _IOBluetooth.IOBluetoothDevice.withAddressString_(devaddr)
        if not iobtdevice and addr is not None:
            msg = "findservices() failed, " +\
                    "failed to find " + devaddr
            raise _lightbluecommon.BluetoothError(msg)
        elif not iobtdevice:
            continue

        try:
            lastseen = iobtdevice.getLastServicesUpdate()
            if lastseen is None or lastseen.timeIntervalSinceNow() < -2:
                # perform SDP query to update known services.
                # wait at least a few seconds between service discovery cos
                # sometimes it doesn't work if doing updates too often.
                # In future should have option to not do updates.
                serviceupdater = _SDPQueryRunner.alloc().init()
                try:
                    serviceupdater.query(iobtdevice)  # blocks until updated
                except _lightbluecommon.BluetoothError as e:
                    msg = "findservices() couldn't get services for %s: %s" % \
                        (iobtdevice.getNameOrAddress(), str(e))
                    warnings.warn(msg)
                    # or should I use cached services instead of warning?
                    # but sometimes the cached ones are totally wrong.

            # if searching for RFCOMM, exclude OBEX services
            if servicetype == _lightbluecommon.RFCOMM:
                uuidbad = _macutil.PROTO_UUIDS.get(_lightbluecommon.OBEX)
            else:
                uuidbad = None

            filtered = _searchservices(iobtdevice, name=name,
                uuid=_macutil.PROTO_UUIDS.get(servicetype),
                uuidbad=uuidbad)

            #print "unfiltered:", iobtdevice.getServices()
            services.extend([_getservicetuple(s) for s in filtered])
        finally:
            # close baseband connection (not sure if this is necessary, but
            # sometimes the transport connection seems to stay open?)
            iobtdevice.closeConnection()

    return services


def finddevicename(address, usecache=True):
    if not _lightbluecommon._isbtaddr(address):
        raise TypeError("%s is not a valid bluetooth address" % str(address))

    if address == gethostaddr():
        return _gethostname()

    device = _IOBluetooth.IOBluetoothDevice.withAddressString_(address)
    if usecache:
        name = device.getName()
        if name is not None:
            return name

    # do name request with timeout of 10 seconds
    result = device.remoteNameRequest_withPageTimeout_(None, 10000)
    if result == _macutil.kIOReturnSuccess:
        return device.getName()
    raise _lightbluecommon.BluetoothError(
        "Could not find device name for %s" % address)


### local device ###

def gethostaddr():
    addr = _LightAquaBlue.BBLocalDevice.getAddressString()
    if addr is not None:
        # PyObjC returns all strings as unicode, but the address doesn't need
        # to be unicode cos it's just hex values
        return _macutil.formatdevaddr(addr)
    raise _lightbluecommon.BluetoothError("Cannot read local device address")


def gethostclass():
    cod = _LightAquaBlue.BBLocalDevice.getClassOfDevice()
    if cod != -1:
        return int(cod)
    raise _lightbluecommon.BluetoothError("Cannot read local device class")


def _gethostname():
    name = _LightAquaBlue.BBLocalDevice.getName()
    if name is not None:
        return name
    raise _lightbluecommon.BluetoothError("Cannot read local device name")


### socket ###

def socket(proto=_lightbluecommon.RFCOMM):
    return _bluetoothsockets._getsocketobject(proto)

### advertising services ###


def advertise(name, sock, servicetype, uuid=None):
    if not isinstance(name, str):
        raise TypeError("name must be string, was %s" % type(name))

    # raises exception if socket is not bound
    boundchannelID = sock._getport()

    # advertise the service
    if servicetype == _lightbluecommon.RFCOMM or servicetype == _lightbluecommon.OBEX:
        try:
            result, finalchannelID, servicerecordhandle = _LightAquaBlue.BBServiceAdvertiser\
                    .addRFCOMMServiceDictionary_withName_UUID_channelID_serviceRecordHandle_(
                            _LightAquaBlue.BBServiceAdvertiser.serialPortProfileDictionary(),
                name, uuid, None, None)
        except:
            result, finalchannelID, servicerecordhandle = _LightAquaBlue.BBServiceAdvertiser\
                    .addRFCOMMServiceDictionary_withName_UUID_channelID_serviceRecordHandle_(
                            _LightAquaBlue.BBServiceAdvertiser.serialPortProfileDictionary(),
                name, uuid)
    else:
        raise ValueError("servicetype must be either RFCOMM or OBEX")

    if result != _macutil.kIOReturnSuccess:
        raise _lightbluecommon.BluetoothError(
                result, "Error advertising service")
    if boundchannelID and boundchannelID != finalchannelID:
        msg = "socket bound to unavailable channel (%d), " % boundchannelID +\
              "use channel value of 0 to bind to dynamically assigned channel"
        raise _lightbluecommon.BluetoothError(msg)

    # note service record handle, so that the service can be stopped later
    __advertised[id(sock)] = servicerecordhandle


def stopadvertise(sock):
    if sock is None:
        raise TypeError("Given socket is None")

    servicerecordhandle = __advertised.get(id(sock))
    if servicerecordhandle is None:
        raise _lightbluecommon.BluetoothError("no service advertised")

    result = _LightAquaBlue.BBServiceAdvertiser.removeService_(servicerecordhandle)
    if result != _macutil.kIOReturnSuccess:
        raise _lightbluecommon.BluetoothError(
            result, "Error stopping advertising of service")


### GUI ###


def selectdevice():
    from . import _IOBluetoothUI
    gui = _IOBluetoothUI.IOBluetoothDeviceSelectorController.deviceSelector()

    # try to bring GUI to foreground by setting it as floating panel
    # (if this is called from pyobjc app, it would automatically be in foreground)
    try:
        gui.window().setFloatingPanel_(True)
    except:
        pass

    # show the window and wait for user's selection
    response = gui.runModal()   # problems here if transferring a lot of data??
    if response == AppKit.NSRunStoppedResponse:
        results = gui.getResults()
        if len(results) > 0:  # should always be > 0, but check anyway
            devinfo = _getdevicetuple(results[0])

            # sometimes the baseband connection stays open which causes
            # problems with connections w so close it here, see if this fixes
            # it
            dev = _IOBluetooth.IOBluetoothDevice.withAddressString_(devinfo[0])
            if dev.isConnected():
                dev.closeConnection()

            return devinfo

    # user cancelled selection
    return None


def selectservice():
    from . import _IOBluetoothUI
    gui = _IOBluetoothUI.IOBluetoothServiceBrowserController.serviceBrowserController_(
            _macutil.kIOBluetoothServiceBrowserControllerOptionsNone)

    # try to bring GUI to foreground by setting it as floating panel
    # (if this is called from pyobjc app, it would automatically be in foreground)
    try:
        gui.window().setFloatingPanel_(True)
    except:
        pass

    # show the window and wait for user's selection
    response = gui.runModal()
    if response == AppKit.NSRunStoppedResponse:
        results = gui.getResults()
        if len(results) > 0:  # should always be > 0, but check anyway
            serviceinfo = _getservicetuple(results[0])

            # sometimes the baseband connection stays open which causes
            # problems with connections ... so close it here, see if this fixes
            # it
            dev = _IOBluetooth.IOBluetoothDevice.deviceWithAddressString_(serviceinfo[0])
            if dev.isConnected():
                dev.closeConnection()

            return serviceinfo

    # user cancelled selection
    return None


### classes ###

class _SDPQueryRunner(Foundation.NSObject):
    """
    Convenience class for performing a synchronous or asynchronous SDP query
    on an IOBluetoothDevice.
    """

    @objc.python_method
    def query(self, device, timeout=10.0):
        # do SDP query
        err = device.performSDPQuery_(self)
        if err != _macutil.kIOReturnSuccess:
            raise _lightbluecommon.BluetoothError(err, self._errmsg(device))

        # performSDPQuery_ is async, so block-wait
        self._queryresult = None
        if not _macutil.waituntil(lambda: self._queryresult is not None,
                                          timeout):
            raise _lightbluecommon.BluetoothError(
                "Timed out getting services for %s" % \
                    device.getNameOrAddress())
        # query is now complete
        if self._queryresult != _macutil.kIOReturnSuccess:
            raise _lightbluecommon.BluetoothError(
                self._queryresult, self._errmsg(device))

    def sdpQueryComplete_status_(self, device, status):
        # can't raise exception during a callback, so just keep the err value
        self._queryresult = status
        _macutil.interruptwait()
    sdpQueryComplete_status_ = objc.selector(
        sdpQueryComplete_status_, signature=b"v@:@i")    # accept object, int

    @objc.python_method
    def _errmsg(self, device):
        return "Error getting services for %s" % device.getNameOrAddress()


class _SyncDeviceInquiry(object):

    def __init__(self):
        super(_SyncDeviceInquiry, self).__init__()

        self._inquiry = _AsyncDeviceInquiry.alloc().init()
        self._inquiry.cb_completed = self._inquirycomplete

        self._inquiring = False

    def run(self, getnames, duration):
        if self._inquiring:
            raise _lightbluecommon.BluetoothError(
                "Another inquiry in progress")

        # set inquiry attributes
        self._inquiry.updatenames = getnames
        self._inquiry.length = duration

        # start the inquiry
        err = self._inquiry.start()
        if err != _macutil.kIOReturnSuccess:
            raise _lightbluecommon.BluetoothError(
                err, "Error starting device inquiry")

        # if error occurs during inquiry, set _inquiryerr to the error code
        self._inquiryerr = _macutil.kIOReturnSuccess

        # wait until the inquiry is complete
        self._inquiring = True
        _macutil.waituntil(lambda: not self._inquiring)

        # if error occured during inquiry, raise exception
        if self._inquiryerr != _macutil.kIOReturnSuccess:
            raise _lightbluecommon.BluetoothError(self._inquiryerr,
                "Error during device inquiry")

    def getfounddevices(self):
        # return as list of device-info tuples
        return [_getdevicetuple(device) for device in \
                    self._inquiry.getfounddevices()]

    def _inquirycomplete(self, err, aborted):
        if err != 188:      # no devices found
            self._inquiryerr = err
        self._inquiring = False
        _macutil.interruptwait()

    def __del__(self):
        self._inquiry.__del__()
        super(_SyncDeviceInquiry, self).__del__()



# Wrapper around IOBluetoothDeviceInquiry, with python callbacks that you can
# set to receive callbacks when the inquiry is started or stopped, or when it
# finds a device.
#
# This discovery doesn't block, so it could be used in a PyObjC application
# that is running an event loop.
#
# Properties:
#   - 'length': the inquiry length (seconds)
#   - 'updatenames': whether to update device names during the inquiry
#     (i.e. perform remote name requests, which will take a little longer)
#
class _AsyncDeviceInquiry(Foundation.NSObject):

    # NSObject init, not python __init__
    def init(self):
        try:
            attr = _IOBluetooth.IOBluetoothDeviceInquiry
        except AttributeError:
            raise ImportError("Cannot find IOBluetoothDeviceInquiry class " +\
                "to perform device discovery. This class was introduced in " +\
                "Mac OS X 10.4, are you running an earlier version?")

        self = super(_AsyncDeviceInquiry, self).init()
        self._inquiry = \
            _IOBluetooth.IOBluetoothDeviceInquiry.inquiryWithDelegate_(self)

        # callbacks
        self.cb_started = None
        self.cb_completed = None
        self.cb_founddevice = None

        return self

    # length property
    @objc.python_method
    def _setlength(self, length):
        self._inquiry.setInquiryLength_(length)
    length = property(
            lambda self: self._inquiry.inquiryLength(),
            _setlength)

    # updatenames property
    @objc.python_method
    def _setupdatenames(self, update):
        self._inquiry.setUpdateNewDeviceNames_(update)
    updatenames = property(
            lambda self: self._inquiry.updateNewDeviceNames(),
            _setupdatenames)

    # returns error code
    def start(self):
        return self._inquiry.start()

    # returns error code
    def stop(self):
        return self._inquiry.stop()

    # returns list of IOBluetoothDevice objects
    def getfounddevices(self):
        return self._inquiry.foundDevices()

    def __del__(self):
        super(_AsyncDeviceInquiry, self).dealloc()


    #
    # delegate methods follow (these are called by the internal
    # IOBluetoothDeviceInquiry object when inquiry events occur)
    #

    # - (void)deviceInquiryDeviceFound:(IOBluetoothDeviceInquiry*)sender
    #                           device:(IOBluetoothDevice*)device;
    def deviceInquiryDeviceFound_device_(self, inquiry, device):
        if self.cb_founddevice:
            self.cb_founddevice(device)
    deviceInquiryDeviceFound_device_ = objc.selector(
        deviceInquiryDeviceFound_device_, signature=b"v@:@@")

    # - (void)deviceInquiryComplete:error:aborted;
    def deviceInquiryComplete_error_aborted_(self, inquiry, err, aborted):
        if self.cb_completed:
            self.cb_completed(err, aborted)
    deviceInquiryComplete_error_aborted_ = objc.selector(
        deviceInquiryComplete_error_aborted_, signature=b"v@:@iZ")

    # - (void)deviceInquiryStarted:(IOBluetoothDeviceInquiry*)sender;
    def deviceInquiryStarted_(self, inquiry):
        if self.cb_started:
            self.cb_started()

    # - (void)deviceInquiryDeviceNameUpdated:device:devicesRemaining:
    def deviceInquiryDeviceNameUpdated_device_devicesRemaining_(self, sender,
                                                              device,
                                                              devicesRemaining):
        pass

    # - (void)deviceInquiryUpdatingDeviceNamesStarted:devicesRemaining:
    def deviceInquiryUpdatingDeviceNamesStarted_devicesRemaining_(self, sender,
                                                                devicesRemaining):
        pass


### utility methods ###


def _searchservices(device, name=None, uuid=None, uuidbad=None):
    """
    Searches the given IOBluetoothDevice using the specified parameters.
    Returns an empty list if the device has no services.

    uuid should be IOBluetoothSDPUUID object.
    """
    if not isinstance(device, _IOBluetooth.IOBluetoothDevice):
        raise ValueError("device must be IOBluetoothDevice, was %s" % \
            type(device))

    services = []
    allservices = device.getServices()
    if uuid:
        gooduuids = (uuid, )
    else:
        gooduuids = ()
    if uuidbad:
        baduuids = (uuidbad, )
    else:
        baduuids = ()

    if allservices is not None:
        for s in allservices:
            if gooduuids and not s.hasServiceFromArray_(gooduuids):
                continue
            if baduuids and s.hasServiceFromArray_(baduuids):
                continue
            if name is None or s.getServiceName() == name:
                services.append(s)
    return services

def _getdevicetuple(iobtdevice):
    """
    Returns an (addr, name, COD) device tuple from a IOBluetoothDevice object.
    """
    addr = _macutil.formatdevaddr(iobtdevice.getAddressString())
    name = iobtdevice.getName()
    cod = iobtdevice.getClassOfDevice()
    return (addr, name, cod)


def _getservicetuple(servicerecord):
    """
        Returns a (device-addr, service-channel, service-name) tuple from the given
        IOBluetoothSDPServiceRecord.
        """
    addr = _macutil.formatdevaddr(servicerecord.getDevice().getAddressString())
    name = servicerecord.getServiceName()
    try:
        result, channel = servicerecord.getRFCOMMChannelID_(None) # pyobjc 2.0
    except TypeError:
        result, channel = servicerecord.getRFCOMMChannelID_()
    if result != _macutil.kIOReturnSuccess:
        try:
            result, channel = servicerecord.getL2CAPPSM_(None) # pyobjc 2.0
        except:
            result, channel = servicerecord.getL2CAPPSM_()
        if result != _macutil.kIOReturnSuccess:
            channel = None
    return (addr, channel, name)