1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
from bumble.hci import HCI_Packet
from bumble.helpers import PacketTracer
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class HCI_Bridge:
class Forwarder:
def __init__(self, hci_sink, sender_hci_sink, packet_filter, trace):
self.hci_sink = hci_sink
self.sender_hci_sink = sender_hci_sink
self.packet_filter = packet_filter
self.trace = trace
def on_packet(self, packet):
# Convert the packet bytes to an object
try:
hci_packet = HCI_Packet.from_bytes(packet)
except Exception:
logger.warning('forwarding unparsed packet as-is')
self.hci_sink.on_packet(packet)
return
# Filter the packet
if self.packet_filter is not None:
filtered = self.packet_filter(hci_packet)
if filtered is not None:
packet, respond_to_sender = filtered
hci_packet = HCI_Packet.from_bytes(packet)
if respond_to_sender:
self.sender_hci_sink.on_packet(packet)
return
# Analyze the packet
try:
self.trace(hci_packet)
except Exception:
logger.exception('Exception while tracing packet')
# Bridge the packet
self.hci_sink.on_packet(packet)
def __init__(
self,
hci_host_source,
hci_host_sink,
hci_controller_source,
hci_controller_sink,
host_to_controller_filter=None,
controller_to_host_filter=None,
):
tracer = PacketTracer(emit_message=logger.info)
host_to_controller_forwarder = HCI_Bridge.Forwarder(
hci_controller_sink,
hci_host_sink,
host_to_controller_filter,
lambda packet: tracer.trace(packet, 0),
)
hci_host_source.set_packet_sink(host_to_controller_forwarder)
controller_to_host_forwarder = HCI_Bridge.Forwarder(
hci_host_sink,
hci_controller_sink,
controller_to_host_filter,
lambda packet: tracer.trace(packet, 1),
)
hci_controller_source.set_packet_sink(controller_to_host_forwarder)
|