File: speaker.py

package info (click to toggle)
python-bumble 0.0.225-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 9,464 kB
  • sloc: python: 75,258; java: 3,782; javascript: 823; xml: 203; sh: 172; makefile: 8
file content (841 lines) | stat: -rw-r--r-- 27,607 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations

import asyncio
import asyncio.subprocess
import enum
import json
import logging
import pathlib
import subprocess
import weakref
from importlib import resources

import aiohttp
import click
from aiohttp import web

import bumble
import bumble.logging
from bumble.a2dp import (
    A2DP_MPEG_2_4_AAC_CODEC_TYPE,
    A2DP_NON_A2DP_CODEC_TYPE,
    A2DP_SBC_CODEC_TYPE,
    AacMediaCodecInformation,
    OpusMediaCodecInformation,
    SbcMediaCodecInformation,
    make_audio_sink_service_sdp_records,
)
from bumble.avdtp import (
    AVDTP_AUDIO_MEDIA_TYPE,
    Listener,
    MediaCodecCapabilities,
    Protocol,
)
from bumble.codecs import AacAudioRtpPacket
from bumble.colors import color
from bumble.core import CommandTimeoutError, PhysicalTransport
from bumble.device import Connection, Device, DeviceConfiguration
from bumble.hci import HCI_StatusError
from bumble.pairing import PairingConfig
from bumble.rtp import MediaPacket
from bumble.sdp import ServiceAttribute
from bumble.transport import open_transport
from bumble.utils import AsyncRunner

# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)


# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
DEFAULT_UI_PORT = 7654


# -----------------------------------------------------------------------------
class AudioExtractor:
    @staticmethod
    def create(codec: str):
        if codec == 'aac':
            return AacAudioExtractor()
        if codec == 'sbc':
            return SbcAudioExtractor()
        if codec == 'opus':
            return OpusAudioExtractor()

    def extract_audio(self, packet: MediaPacket) -> bytes:
        raise NotImplementedError()


# -----------------------------------------------------------------------------
class AacAudioExtractor:
    def extract_audio(self, packet: MediaPacket) -> bytes:
        return AacAudioRtpPacket.from_bytes(packet.payload).to_adts()


# -----------------------------------------------------------------------------
class SbcAudioExtractor:
    def extract_audio(self, packet: MediaPacket) -> bytes:
        # header = packet.payload[0]
        # fragmented = header >> 7
        # start = (header >> 6) & 0x01
        # last = (header >> 5) & 0x01
        # number_of_frames = header & 0x0F

        # TODO: support fragmented payloads
        return packet.payload[1:]


# -----------------------------------------------------------------------------
class OpusAudioExtractor:
    def extract_audio(self, packet: MediaPacket) -> bytes:
        # TODO: parse fields
        return packet.payload[1:]


# -----------------------------------------------------------------------------
class Output:
    async def start(self) -> None:
        pass

    async def stop(self) -> None:
        pass

    async def suspend(self) -> None:
        pass

    async def on_connection(self, connection: Connection) -> None:
        pass

    async def on_disconnection(self, reason: int) -> None:
        pass

    def on_rtp_packet(self, packet: MediaPacket) -> None:
        pass


# -----------------------------------------------------------------------------
class FileOutput(Output):
    filename: str
    codec: str
    extractor: AudioExtractor

    def __init__(self, filename, codec):
        self.filename = filename
        self.codec = codec
        self.file = open(filename, 'wb')
        self.extractor = AudioExtractor.create(codec)

    def on_rtp_packet(self, packet: MediaPacket) -> None:
        self.file.write(self.extractor.extract_audio(packet))


# -----------------------------------------------------------------------------
class QueuedOutput(Output):
    MAX_QUEUE_SIZE = 32768

    packets: asyncio.Queue
    extractor: AudioExtractor
    packet_pump_task: asyncio.Task | None
    started: bool

    def __init__(self, extractor):
        self.extractor = extractor
        self.packets = asyncio.Queue()
        self.packet_pump_task = None
        self.started = False

    async def start(self):
        if self.started:
            return

        self.packet_pump_task = asyncio.create_task(self.pump_packets())

    async def pump_packets(self):
        while True:
            packet = await self.packets.get()
            await self.on_audio_packet(packet)

    async def on_audio_packet(self, packet: bytes) -> None:
        pass

    def on_rtp_packet(self, packet: MediaPacket) -> None:
        if self.packets.qsize() > self.MAX_QUEUE_SIZE:
            logger.debug("queue full, dropping")
            return

        self.packets.put_nowait(self.extractor.extract_audio(packet))


# -----------------------------------------------------------------------------
class WebSocketOutput(QueuedOutput):
    def __init__(self, codec, send_audio, send_message):
        super().__init__(AudioExtractor.create(codec))
        self.send_audio = send_audio
        self.send_message = send_message

    async def on_connection(self, connection: Connection) -> None:
        try:
            await connection.request_remote_name()
        except HCI_StatusError:
            pass
        peer_name = '' if connection.peer_name is None else connection.peer_name
        peer_address = connection.peer_address.to_string(False)
        await self.send_message(
            'connection',
            peer_address=peer_address,
            peer_name=peer_name,
        )

    async def on_disconnection(self, reason) -> None:
        await self.send_message('disconnection')

    async def on_audio_packet(self, packet: bytes) -> None:
        await self.send_audio(packet)

    async def start(self):
        await super().start()
        await self.send_message('start')

    async def stop(self):
        await super().stop()
        await self.send_message('stop')

    async def suspend(self):
        await super().suspend()
        await self.send_message('suspend')


# -----------------------------------------------------------------------------
class FfplayOutput(QueuedOutput):
    MAX_QUEUE_SIZE = 32768

    subprocess: asyncio.subprocess.Process | None
    ffplay_task: asyncio.Task | None

    def __init__(self, codec: str) -> None:
        super().__init__(AudioExtractor.create(codec))
        self.subprocess = None
        self.ffplay_task = None
        self.codec = codec

    async def start(self):
        if self.started:
            return

        await super().start()

        self.subprocess = await asyncio.create_subprocess_shell(
            f'ffplay -probesize 32 -f {self.codec} pipe:0',
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

        self.ffplay_task = asyncio.create_task(self.monitor_ffplay())

    async def stop(self):
        # TODO
        pass

    async def suspend(self):
        # TODO
        pass

    async def monitor_ffplay(self):
        async def read_stream(name, stream):
            while True:
                data = await stream.read()
                logger.debug(f'{name}:', data)

        await asyncio.wait(
            [
                asyncio.create_task(
                    read_stream('[ffplay stdout]', self.subprocess.stdout)
                ),
                asyncio.create_task(
                    read_stream('[ffplay stderr]', self.subprocess.stderr)
                ),
                asyncio.create_task(self.subprocess.wait()),
            ]
        )
        logger.debug("FFPLAY done")

    async def on_audio_packet(self, packet):
        try:
            self.subprocess.stdin.write(packet)
        except Exception:
            logger.warning('!!!! exception while sending audio to ffplay pipe')


# -----------------------------------------------------------------------------
class UiServer:
    speaker: weakref.ReferenceType[Speaker]
    port: int

    def __init__(self, speaker: Speaker, port: int) -> None:
        self.speaker = weakref.ref(speaker)
        self.port = port
        self.channel_socket = None

    async def start_http(self) -> None:
        """Start the UI HTTP server."""

        app = web.Application()
        app.add_routes(
            [
                web.get('/', self.get_static),
                web.get('/speaker.html', self.get_static),
                web.get('/speaker.js', self.get_static),
                web.get('/speaker.css', self.get_static),
                web.get('/logo.svg', self.get_static),
                web.get('/channel', self.get_channel),
            ]
        )

        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner, 'localhost', self.port)
        print('UI HTTP server at ' + color(f'http://127.0.0.1:{self.port}', 'green'))
        await site.start()

    async def get_static(self, request):
        path = request.path
        if path == '/':
            path = '/speaker.html'
        if path.endswith('.html'):
            content_type = 'text/html'
        elif path.endswith('.js'):
            content_type = 'text/javascript'
        elif path.endswith('.css'):
            content_type = 'text/css'
        elif path.endswith('.svg'):
            content_type = 'image/svg+xml'
        else:
            content_type = 'text/plain'
        text = (
            resources.files("bumble.apps.speaker")
            .joinpath(pathlib.Path(path).relative_to('/'))
            .read_text(encoding="utf-8")
        )
        return aiohttp.web.Response(text=text, content_type=content_type)

    async def get_channel(self, request):
        ws = web.WebSocketResponse()
        await ws.prepare(request)

        # Process messages until the socket is closed.
        self.channel_socket = ws
        async for message in ws:
            if message.type == aiohttp.WSMsgType.TEXT:
                logger.debug(f'<<< received message: {message.data}')
                await self.on_message(message.data)
            elif message.type == aiohttp.WSMsgType.ERROR:
                logger.debug(
                    f'channel connection closed with exception {ws.exception()}'
                )

        self.channel_socket = None
        logger.debug('--- channel connection closed')

        return ws

    async def on_message(self, message_str: str):
        # Parse the message as JSON
        message = json.loads(message_str)

        # Dispatch the message
        message_type = message['type']
        message_params = message.get('params', {})
        handler = getattr(self, f'on_{message_type}_message')
        if handler:
            await handler(**message_params)

    async def on_hello_message(self):
        await self.send_message(
            'hello',
            bumble_version=bumble.__version__,
            codec=self.speaker().codec,
            streamState=self.speaker().stream_state.name,
        )
        if connection := self.speaker().connection:
            await self.send_message(
                'connection',
                peer_address=connection.peer_address.to_string(False),
                peer_name=connection.peer_name,
            )

    async def send_message(self, message_type: str, **kwargs) -> None:
        if self.channel_socket is None:
            return

        message = {'type': message_type, 'params': kwargs}
        await self.channel_socket.send_json(message)

    async def send_audio(self, data: bytes) -> None:
        if self.channel_socket is None:
            return

        try:
            await self.channel_socket.send_bytes(data)
        except Exception as error:
            logger.warning(f'exception while sending audio packet: {error}')


# -----------------------------------------------------------------------------
class Speaker:
    class StreamState(enum.Enum):
        IDLE = 0
        STOPPED = 1
        STARTED = 2
        SUSPENDED = 3

    def __init__(
        self,
        device_config,
        transport,
        codec,
        sampling_frequencies,
        bitrate,
        vbr,
        discover,
        outputs,
        ui_port,
    ):
        self.device_config = device_config
        self.transport = transport
        self.codec = codec
        self.sampling_frequencies = sampling_frequencies
        self.bitrate = bitrate
        self.vbr = vbr
        self.discover = discover
        self.ui_port = ui_port
        self.device = None
        self.connection = None
        self.listener = None
        self.packets_received = 0
        self.bytes_received = 0
        self.stream_state = Speaker.StreamState.IDLE
        self.outputs = []
        for output in outputs:
            if output == '@ffplay':
                self.outputs.append(FfplayOutput(codec))
                continue

            # Default to FileOutput
            self.outputs.append(FileOutput(output, codec))

        # Create an HTTP server for the UI
        self.ui_server = UiServer(speaker=self, port=ui_port)

    def sdp_records(self) -> dict[int, list[ServiceAttribute]]:
        service_record_handle = 0x00010001
        return {
            service_record_handle: make_audio_sink_service_sdp_records(
                service_record_handle
            )
        }

    def codec_capabilities(self) -> MediaCodecCapabilities:
        if self.codec == 'aac':
            return self.aac_codec_capabilities()

        if self.codec == 'sbc':
            return self.sbc_codec_capabilities()

        if self.codec == 'opus':
            return self.opus_codec_capabilities()

        raise RuntimeError('unsupported codec')

    def aac_codec_capabilities(self) -> MediaCodecCapabilities:
        supported_sampling_frequencies = AacMediaCodecInformation.SamplingFrequency(0)
        for sampling_frequency in self.sampling_frequencies or [
            8000,
            11025,
            12000,
            16000,
            22050,
            24000,
            32000,
            44100,
            48000,
        ]:
            supported_sampling_frequencies |= (
                AacMediaCodecInformation.SamplingFrequency.from_int(sampling_frequency)
            )
        return MediaCodecCapabilities(
            media_type=AVDTP_AUDIO_MEDIA_TYPE,
            media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE,
            media_codec_information=AacMediaCodecInformation(
                object_type=AacMediaCodecInformation.ObjectType.MPEG_2_AAC_LC,
                sampling_frequency=supported_sampling_frequencies,
                channels=AacMediaCodecInformation.Channels.MONO
                | AacMediaCodecInformation.Channels.STEREO,
                vbr=1 if self.vbr else 0,
                bitrate=self.bitrate or 256000,
            ),
        )

    def sbc_codec_capabilities(self) -> MediaCodecCapabilities:
        supported_sampling_frequencies = SbcMediaCodecInformation.SamplingFrequency(0)
        for sampling_frequency in self.sampling_frequencies or [
            16000,
            32000,
            44100,
            48000,
        ]:
            supported_sampling_frequencies |= (
                SbcMediaCodecInformation.SamplingFrequency.from_int(sampling_frequency)
            )
        return MediaCodecCapabilities(
            media_type=AVDTP_AUDIO_MEDIA_TYPE,
            media_codec_type=A2DP_SBC_CODEC_TYPE,
            media_codec_information=SbcMediaCodecInformation(
                sampling_frequency=supported_sampling_frequencies,
                channel_mode=SbcMediaCodecInformation.ChannelMode.MONO
                | SbcMediaCodecInformation.ChannelMode.DUAL_CHANNEL
                | SbcMediaCodecInformation.ChannelMode.STEREO
                | SbcMediaCodecInformation.ChannelMode.JOINT_STEREO,
                block_length=SbcMediaCodecInformation.BlockLength.BL_4
                | SbcMediaCodecInformation.BlockLength.BL_8
                | SbcMediaCodecInformation.BlockLength.BL_12
                | SbcMediaCodecInformation.BlockLength.BL_16,
                subbands=SbcMediaCodecInformation.Subbands.S_4
                | SbcMediaCodecInformation.Subbands.S_8,
                allocation_method=SbcMediaCodecInformation.AllocationMethod.LOUDNESS
                | SbcMediaCodecInformation.AllocationMethod.SNR,
                minimum_bitpool_value=2,
                maximum_bitpool_value=53,
            ),
        )

    def opus_codec_capabilities(self) -> MediaCodecCapabilities:
        supported_sampling_frequencies = OpusMediaCodecInformation.SamplingFrequency(0)
        for sampling_frequency in self.sampling_frequencies or [48000]:
            supported_sampling_frequencies |= (
                OpusMediaCodecInformation.SamplingFrequency.from_int(sampling_frequency)
            )
        return MediaCodecCapabilities(
            media_type=AVDTP_AUDIO_MEDIA_TYPE,
            media_codec_type=A2DP_NON_A2DP_CODEC_TYPE,
            media_codec_information=OpusMediaCodecInformation(
                frame_size=OpusMediaCodecInformation.FrameSize.FS_10MS
                | OpusMediaCodecInformation.FrameSize.FS_20MS,
                channel_mode=OpusMediaCodecInformation.ChannelMode.MONO
                | OpusMediaCodecInformation.ChannelMode.STEREO
                | OpusMediaCodecInformation.ChannelMode.DUAL_MONO,
                sampling_frequency=supported_sampling_frequencies,
            ),
        )

    async def dispatch_to_outputs(self, function):
        for output in self.outputs:
            await function(output)

    def on_bluetooth_connection(self, connection):
        print(f'Connection: {connection}')
        self.connection = connection
        connection.on('disconnection', self.on_bluetooth_disconnection)
        AsyncRunner.spawn(
            self.dispatch_to_outputs(lambda output: output.on_connection(connection))
        )

    def on_bluetooth_disconnection(self, reason):
        print(f'Disconnection ({reason})')
        self.connection = None
        AsyncRunner.spawn(self.advertise())
        AsyncRunner.spawn(
            self.dispatch_to_outputs(lambda output: output.on_disconnection(reason))
        )

    def on_avdtp_connection(self, protocol):
        print('Audio Stream Open')

        # Add a sink endpoint to the server
        sink = protocol.add_sink(self.codec_capabilities())
        sink.on('start', self.on_sink_start)
        sink.on('stop', self.on_sink_stop)
        sink.on('suspend', self.on_sink_suspend)
        sink.on('configuration', lambda: self.on_sink_configuration(sink.configuration))
        sink.on('rtp_packet', self.on_rtp_packet)
        sink.on('rtp_channel_open', self.on_rtp_channel_open)
        sink.on('rtp_channel_close', self.on_rtp_channel_close)

        # Listen for close events
        protocol.on('close', self.on_avdtp_close)

        # Discover all endpoints on the remote device is requested
        if self.discover:
            AsyncRunner.spawn(self.discover_remote_endpoints(protocol))

    def on_avdtp_close(self):
        print("Audio Stream Closed")

    def on_sink_start(self):
        print("Sink Started\u001b[0K")
        self.stream_state = self.StreamState.STARTED
        AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.start()))

    def on_sink_stop(self):
        print("Sink Stopped\u001b[0K")
        self.stream_state = self.StreamState.STOPPED
        AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.stop()))

    def on_sink_suspend(self):
        print("Sink Suspended\u001b[0K")
        self.stream_state = self.StreamState.SUSPENDED
        AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.suspend()))

    def on_sink_configuration(self, config):
        print("Sink Configuration:")
        print('\n'.join(["  " + str(capability) for capability in config]))

    def on_rtp_channel_open(self):
        print("RTP Channel Open")

    def on_rtp_channel_close(self):
        print("RTP Channel Closed")
        self.stream_state = self.StreamState.IDLE

    def on_rtp_packet(self, packet):
        self.packets_received += 1
        self.bytes_received += len(packet.payload)
        print(
            f'[{self.bytes_received} bytes in {self.packets_received} packets] {packet}',
            end='\r',
        )

        for output in self.outputs:
            output.on_rtp_packet(packet)

    async def advertise(self):
        await self.device.set_discoverable(True)
        await self.device.set_connectable(True)

    async def connect(self, address):
        # Connect to the source
        print(f'=== Connecting to {address}...')
        connection = await self.device.connect(
            address, transport=PhysicalTransport.BR_EDR
        )
        print(f'=== Connected to {connection.peer_address}')

        # Request authentication
        print('*** Authenticating...')
        await connection.authenticate()
        print('*** Authenticated')

        # Enable encryption
        print('*** Enabling encryption...')
        await connection.encrypt()
        print('*** Encryption on')

        protocol = await Protocol.connect(connection)
        self.listener.set_server(connection, protocol)
        self.on_avdtp_connection(protocol)

    async def discover_remote_endpoints(self, protocol):
        endpoints = await protocol.discover_remote_endpoints()
        print(f'@@@ Found {len(endpoints)} endpoints')
        for endpoint in endpoints:
            print('@@@', endpoint)

    async def run(self, connect_address):
        await self.ui_server.start_http()
        self.outputs.append(
            WebSocketOutput(
                self.codec, self.ui_server.send_audio, self.ui_server.send_message
            )
        )

        async with await open_transport(self.transport) as (hci_source, hci_sink):
            # Create a device
            device_config = DeviceConfiguration()
            if self.device_config:
                device_config.load_from_file(self.device_config)
            else:
                device_config.name = "Bumble Speaker"
                device_config.class_of_device = 0x240414
                device_config.keystore = "JsonKeyStore"

            device_config.classic_enabled = True
            device_config.le_enabled = False
            self.device = Device.from_config_with_hci(
                device_config, hci_source, hci_sink
            )

            # Setup the SDP to expose the sink service
            self.device.sdp_service_records = self.sdp_records()

            # Don't require MITM when pairing.
            self.device.pairing_config_factory = lambda connection: PairingConfig(
                mitm=False
            )

            # Start the controller
            await self.device.power_on()

            # Print some of the config/properties
            print("Speaker Name:", color(device_config.name, 'yellow'))
            print(
                "Speaker Bluetooth Address:",
                color(
                    self.device.public_address.to_string(with_type_qualifier=False),
                    'yellow',
                ),
            )

            # Listen for Bluetooth connections
            self.device.on('connection', self.on_bluetooth_connection)

            # Create a listener to wait for AVDTP connections
            self.listener = Listener.for_device(self.device)
            self.listener.on('connection', self.on_avdtp_connection)

            print(f'Speaker ready to play, codec={color(self.codec, "cyan")}')

            if connect_address:
                # Connect to the source
                try:
                    await self.connect(connect_address)
                except CommandTimeoutError:
                    print(color("Connection timed out", "red"))
                    return
            else:
                # Start being discoverable and connectable
                print("Waiting for connection...")
                await self.advertise()

            await hci_source.terminated

        for output in self.outputs:
            await output.stop()


# -----------------------------------------------------------------------------
@click.group()
@click.pass_context
def speaker_cli(ctx, device_config):
    ctx.ensure_object(dict)
    ctx.obj['device_config'] = device_config


@click.command()
@click.option(
    '--codec',
    type=click.Choice(['sbc', 'aac', 'opus']),
    default='aac',
    show_default=True,
)
@click.option(
    '--sampling-frequency',
    metavar='SAMPLING-FREQUENCY',
    type=int,
    multiple=True,
    help='Enable a sampling frequency (may be specified more than once)',
)
@click.option(
    '--bitrate',
    metavar='BITRATE',
    type=int,
    help='Supported bitrate (AAC only)',
)
@click.option(
    '--vbr/--no-vbr', is_flag=True, default=True, help='Enable VBR (AAC only)'
)
@click.option(
    '--discover', is_flag=True, help='Discover remote endpoints once connected'
)
@click.option(
    '--output',
    multiple=True,
    metavar='NAME',
    help=(
        'Send audio to this named output '
        '(may be used more than once for multiple outputs)'
    ),
)
@click.option(
    '--ui-port',
    'ui_port',
    metavar='HTTP_PORT',
    default=DEFAULT_UI_PORT,
    show_default=True,
    help='HTTP port for the UI server',
)
@click.option(
    '--connect',
    'connect_address',
    metavar='ADDRESS_OR_NAME',
    help='Address or name to connect to',
)
@click.option('--device-config', metavar='FILENAME', help='Device configuration file')
@click.argument('transport')
def speaker(
    transport,
    codec,
    sampling_frequency,
    bitrate,
    vbr,
    connect_address,
    discover,
    output,
    ui_port,
    device_config,
):
    """Run the speaker."""

    if '@ffplay' in output:
        # Check if ffplay is installed
        try:
            subprocess.run(['ffplay', '-version'], capture_output=True, check=True)
        except FileNotFoundError:
            print(
                color('ffplay not installed, @ffplay output will be disabled', 'yellow')
            )
            output = list(filter(lambda x: x != '@ffplay', output))

    asyncio.run(
        Speaker(
            device_config,
            transport,
            codec,
            sampling_frequency,
            bitrate,
            vbr,
            discover,
            output,
            ui_port,
        ).run(connect_address)
    )


# -----------------------------------------------------------------------------
def main():
    bumble.logging.setup_basic_logging('WARNING')
    speaker()


# -----------------------------------------------------------------------------
if __name__ == "__main__":
    main()  # pylint: disable=no-value-for-parameter