rust/bt-broadcast-assistant: Initial implementation Functionalities are: - continuously scan and collate information about broadcast sources - connect to peers and add/update/remove sources Change-Id: I0628dd8035bfb6d3c569e16e438b9d221959762b Reviewed-on: https://bluetooth-review.git.corp.google.com/c/bluetooth/+/1580 Reviewed-by: Marie Janssen <jamuraa@google.com> Reviewed-by: Ani Ramakrishnan <aniramakri@google.com>
diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 60e0918..f4da3a7 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml
@@ -12,6 +12,7 @@ ## Local path dependencies (keep sorted) bt-bass = { path = "bt-bass" } +bt-broadcast-assistant = { path = "bt-broadcast-assistant" } bt-common = { path = "bt-common" } bt-gatt = { path = "bt-gatt" } bt-pacs = { path = "bt-pacs" }
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs index 3aa0b18..3479c66 100644 --- a/rust/bt-bass/src/client.rs +++ b/rust/bt-bass/src/client.rs
@@ -13,7 +13,7 @@ use parking_lot::Mutex; use tracing::warn; -use bt_common::core::{AdvertisingSetId, PaInterval}; +use bt_common::core::{AddressType, AdvertisingSetId, PaInterval}; use bt_common::generic_audio::metadata_ltv::Metadata; use bt_common::packet_encoding::Decodable; use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic}; @@ -123,7 +123,7 @@ audio_scan_control_point, broadcast_sources: Default::default(), broadcast_codes: HashMap::new(), - notification_streams: None, + notification_streams: Some(SelectAll::new()), } } @@ -407,6 +407,11 @@ } brs } + + #[cfg(any(test, feature = "test-utils"))] + pub fn insert_broadcast_receive_state(&mut self, handle: Handle, brs: BroadcastReceiveState) { + self.broadcast_sources.lock().update_state(handle, brs); + } } #[cfg(test)]
diff --git a/rust/bt-bass/src/client/event.rs b/rust/bt-bass/src/client/event.rs index 2c7ac2f..4077373 100644 --- a/rust/bt-bass/src/client/event.rs +++ b/rust/bt-bass/src/client/event.rs
@@ -218,6 +218,7 @@ use assert_matches::assert_matches; use futures::channel::mpsc::unbounded; + use bt_common::core::AddressType; use bt_gatt::types::Handle; #[test]
diff --git a/rust/bt-bass/src/types.rs b/rust/bt-bass/src/types.rs index f810a3d..31c8b2d 100644 --- a/rust/bt-bass/src/types.rs +++ b/rust/bt-bass/src/types.rs
@@ -3,7 +3,7 @@ // found in the LICENSE file. use bt_common::core::ltv::LtValue; -use bt_common::core::{AdvertisingSetId, PaInterval}; +use bt_common::core::{AddressType, AdvertisingSetId, PaInterval}; use bt_common::generic_audio::metadata_ltv::*; use bt_common::packet_encoding::{Decodable, Encodable, Error as PacketError}; use bt_common::{decodable_enum, Uuid}; @@ -676,32 +676,6 @@ } } -/// See Broadcast Audio Scan Service spec v1.0 Table 3.5 for details. -#[repr(u8)] -#[derive(Clone, Copy, Debug, PartialEq)] -pub enum AddressType { - // Public Device Address or Public Identity Address. - Public = 0x00, - // Random Device Address or Random (static) Identity Address. - Random = 0x01, -} - -impl AddressType { - const BYTE_SIZE: usize = 1; -} - -impl TryFrom<u8> for AddressType { - type Error = PacketError; - - fn try_from(value: u8) -> Result<Self, Self::Error> { - match value { - 0x00 => Ok(Self::Public), - 0x01 => Ok(Self::Random), - _ => Err(PacketError::OutOfRange), - } - } -} - /// Broadcast Receive State characteristic as defined in /// Broadcast Audio Scan Service spec v1.0 Section 3.2. /// The Broadcast Receive State characteristic is used by the server to expose @@ -791,7 +765,7 @@ + EncryptionStatus::MIN_PACKET_SIZE + NUM_SUBGROUPS_BYTE_SIZE; - #[cfg(test)] + #[cfg(any(test, feature = "test-utils"))] pub fn new( source_id: u8, source_address_type: AddressType,
diff --git a/rust/bt-broadcast-assistant/.gitignore b/rust/bt-broadcast-assistant/.gitignore new file mode 100644 index 0000000..75e03aa --- /dev/null +++ b/rust/bt-broadcast-assistant/.gitignore
@@ -0,0 +1,17 @@ +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# Vim swap files +*.swp
diff --git a/rust/bt-broadcast-assistant/Cargo.toml b/rust/bt-broadcast-assistant/Cargo.toml new file mode 100644 index 0000000..a3ccfa6 --- /dev/null +++ b/rust/bt-broadcast-assistant/Cargo.toml
@@ -0,0 +1,17 @@ +[package] +name = "bt-broadcast-assistant" +version = "0.0.1" +edition.workspace = true +license.workspace = true + +[dependencies] +bt-bass.workspace = true +bt-common.workspace = true +bt-gatt.workspace = true +futures.workspace = true +parking_lot.workspace = true +thiserror.workspace = true + +[dev-dependencies] +assert_matches.workspace = true +bt-bass = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-broadcast-assistant/src/assistant.rs b/rust/bt-broadcast-assistant/src/assistant.rs new file mode 100644 index 0000000..3d3e61e --- /dev/null +++ b/rust/bt-broadcast-assistant/src/assistant.rs
@@ -0,0 +1,272 @@ +// Copyright 2023 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use parking_lot::Mutex; +use std::collections::HashMap; +use std::sync::Arc; +use thiserror::Error; + +use bt_bass::client::error::Error as BassClientError; +use bt_bass::client::BroadcastAudioScanServiceClient; +use bt_bass::types::BroadcastId; +use bt_common::{PeerId, Uuid}; +use bt_gatt::central::*; +use bt_gatt::client::PeerServiceHandle; +use bt_gatt::types::Error as GattError; +use bt_gatt::Client; + +pub mod event; +use event::*; +pub mod peer; +pub use peer::Peer; + +use crate::types::*; + +pub const BROADCAST_AUDIO_SCAN_SERVICE: Uuid = Uuid::from_u16(0x184F); +pub const BASIC_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1851); +pub const BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1852); + +#[derive(Debug, Error)] +pub enum Error { + #[error("GATT operation error: {0:?}")] + Gatt(#[from] GattError), + + #[error("Broadcast Audio Scan Service client error at peer ({0}): {1:?}")] + BassClient(PeerId, BassClientError), + + #[error("Not connected to Broadcast Audio Scan Service at peer ({0})")] + NotConnectedToBass(PeerId), + + #[error("Central scanning terminated unexpectedly")] + CentralScanTerminated, + + #[error("Failed to connect to service ({1}) at peer ({0})")] + ConnectionFailure(PeerId, Uuid), + + #[error("Broadcast Assistant was already started previously. It cannot be started twice")] + AlreadyStarted, + + #[error("Failed due to error: {0}")] + Generic(String), +} + +/// Contains information about the currently-known broadcast +/// sources and the peers they were found on +#[derive(Debug)] +pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<PeerId, BroadcastSource>>); + +impl DiscoveredBroadcastSources { + /// Creates a shareable instance of `DiscoveredBroadcastSources`. + pub fn new() -> Arc<Self> { + Arc::new(Self(Mutex::new(HashMap::new()))) + } + + /// Merges the broadcast source data with existing broadcast source data. + /// Returns the copy of the broadcast source data after the merge and + /// indicates whether it has changed from before or not. + pub(crate) fn merge_broadcast_source_data( + &self, + peer_id: &PeerId, + data: &BroadcastSource, + ) -> (BroadcastSource, bool) { + let mut lock = self.0.lock(); + let source = lock.entry(*peer_id).or_default(); + let before = source.clone(); + + source.merge(data); + let after = source.clone(); + let changed = before != after; + + (after, changed) + } + + /// Get a BroadcastSource from a peer id. + fn get_by_peer_id(&self, peer_id: &PeerId) -> Option<BroadcastSource> { + let lock = self.0.lock(); + lock.get(&peer_id).clone().map(|source| source.clone()) + } + + /// Get a BroadcastSource from associated broadcast id. + fn get_by_broadcast_id(&self, broadcast_id: &BroadcastId) -> Option<BroadcastSource> { + let lock = self.0.lock(); + let info = lock.iter().find(|(&_k, &ref v)| v.broadcast_id == Some(*broadcast_id)); + match info { + Some((&_peer_id, &ref broadcast_source)) => Some(broadcast_source.clone()), + None => None, + } + } +} + +pub struct BroadcastAssistant<T: bt_gatt::GattTypes> { + central: T::Central, + broadcast_sources: Arc<DiscoveredBroadcastSources>, + scan_stream: Option<T::ScanResultStream>, +} + +impl<T: bt_gatt::GattTypes + 'static> BroadcastAssistant<T> { + // Creates a broadcast assistant and sets it up to be ready + // for broadcast source scanning. Clients must use the `start` + // method to poll the event stream for scan results. + pub fn new(central: T::Central) -> Self + where + <T as bt_gatt::GattTypes>::ScanResultStream: std::marker::Send, + { + let scan_result_stream = central.scan(&Self::scan_filters()); + Self { + central, + broadcast_sources: DiscoveredBroadcastSources::new(), + scan_stream: Some(scan_result_stream), + } + } + + /// List of scan filters for advertisement data Broadcast Assistant should + /// look for, which are: + /// - Service data with Broadcast Audio Announcement Service UUID from + /// Broadcast Sources (see BAP spec v1.0.1 Section 3.7.2.1 for details) + // TODO(b/308481381): define filter for finding broadcast sink. + fn scan_filters() -> Vec<ScanFilter> { + vec![Filter::HasServiceData(BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE).into()] + } + + /// Start broadcast assistant. Returns EventStream that the upper layer can + /// poll. Upper layer can call methods on BroadcastAssistant based on the + /// events it sees. + pub fn start(&mut self) -> Result<EventStream, Error> + where + <T as bt_gatt::GattTypes>::ScanResultStream: std::marker::Send, + { + if self.scan_stream.is_none() { + return Err(Error::AlreadyStarted); + } + Ok(EventStream::new(self.scan_stream.take().unwrap(), self.broadcast_sources.clone())) + } + + pub fn scan_for_scan_delegators(&mut self) -> T::ScanResultStream { + // Scan for service data with Broadcast Audio Scan Service UUID to look + // for Broadcast Sink collocated with the Scan Delegator (see BAP spec v1.0.1 + // Section 3.9.2 for details). + self.central.scan(&vec![Filter::HasServiceData(BROADCAST_AUDIO_SCAN_SERVICE).into()]) + } + + pub async fn connect_to_scan_delegator(&mut self, peer_id: PeerId) -> Result<Peer<T>, Error> + where + <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send, + { + let client = self.central.connect(peer_id).await?; + let service_handles = client.find_service(BROADCAST_AUDIO_SCAN_SERVICE).await?; + + for handle in service_handles { + if handle.uuid() != BROADCAST_AUDIO_SCAN_SERVICE || !handle.is_primary() { + continue; + } + let service = handle.connect().await?; + let bass = BroadcastAudioScanServiceClient::<T>::create(service) + .await + .map_err(|e| Error::BassClient(peer_id, e))?; + + let connected_peer = + Peer::<T>::new(peer_id, client, bass, self.broadcast_sources.clone()); + return Ok(connected_peer); + } + Err(Error::ConnectionFailure(peer_id, BROADCAST_AUDIO_SCAN_SERVICE)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use futures::{pin_mut, FutureExt}; + use std::task::Poll; + + use bt_common::core::{AddressType, AdvertisingSetId}; + use bt_gatt::test_utils::{FakeCentral, FakeClient, FakeTypes}; + + use crate::assistant::peer::tests::fake_bass_service; + + #[test] + fn merge_broadcast_source() { + let discovered = DiscoveredBroadcastSources::new(); + + let (bs, changed) = discovered.merge_broadcast_source_data( + &PeerId(1001), + &BroadcastSource::default() + .with_address([1, 2, 3, 4, 5, 6]) + .with_address_type(AddressType::Public) + .with_advertising_sid(AdvertisingSetId(1)) + .with_broadcast_id(BroadcastId(1001)), + ); + assert!(changed); + assert_eq!( + bs, + BroadcastSource { + address: Some([1, 2, 3, 4, 5, 6]), + address_type: Some(AddressType::Public), + advertising_sid: Some(AdvertisingSetId(1)), + broadcast_id: Some(BroadcastId(1001)), + pa_interval: None, + endpoint: None, + } + ); + + let (bs, changed) = discovered.merge_broadcast_source_data( + &PeerId(1001), + &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint( + BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] }, + ), + ); + assert!(changed); + assert_eq!( + bs, + BroadcastSource { + address: Some([1, 2, 3, 4, 5, 6]), + address_type: Some(AddressType::Random), + advertising_sid: Some(AdvertisingSetId(1)), + broadcast_id: Some(BroadcastId(1001)), + pa_interval: None, + endpoint: Some(BroadcastAudioSourceEndpoint { + presentation_delay_ms: 32, + big: vec![] + }), + } + ); + + let (_, changed) = discovered.merge_broadcast_source_data( + &PeerId(1001), + &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint( + BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] }, + ), + ); + assert!(!changed); + } + + #[test] + fn start_stream() { + let mut assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new()); + let _ = assistant.start().expect("can start stream"); + + // Stream can only be started once. + assert!(assistant.start().is_err()); + } + + #[test] + fn connect_to_scan_delegator() { + // Set up fake GATT related objects. + let mut central = FakeCentral::new(); + let mut client = FakeClient::new(); + central.add_client(PeerId(1004), client.clone()); + let service = fake_bass_service(); + client.add_service(BROADCAST_AUDIO_SCAN_SERVICE, true, service.clone()); + + let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); + let mut assistant = BroadcastAssistant::<FakeTypes>::new(central); + let conn_fut = assistant.connect_to_scan_delegator(PeerId(1004)); + pin_mut!(conn_fut); + let polled = conn_fut.poll_unpin(&mut noop_cx); + let Poll::Ready(res) = polled else { + panic!("should be ready"); + }; + let _ = res.expect("should be ok"); + } +}
diff --git a/rust/bt-broadcast-assistant/src/assistant/event.rs b/rust/bt-broadcast-assistant/src/assistant/event.rs new file mode 100644 index 0000000..993ba6f --- /dev/null +++ b/rust/bt-broadcast-assistant/src/assistant/event.rs
@@ -0,0 +1,259 @@ +// Copyright 2023 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use futures::stream::{FusedStream, Stream, StreamExt}; +use std::sync::Arc; +use std::task::Poll; + +use bt_bass::types::BroadcastId; +use bt_common::packet_encoding::Decodable; +use bt_common::packet_encoding::Error as PacketError; +use bt_common::PeerId; +use bt_gatt::central::{AdvertisingDatum, ScanResult}; +use core::pin::Pin; + +use crate::assistant::{ + DiscoveredBroadcastSources, Error, BASIC_AUDIO_ANNOUNCEMENT_SERVICE, + BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE, +}; +use crate::types::{BroadcastAudioSourceEndpoint, BroadcastSource}; + +#[derive(Debug)] +pub enum Event { + FoundBroadcastSource { peer: PeerId, source: BroadcastSource }, + CouldNotParseAdvertisingData { peer: PeerId, error: PacketError }, +} + +/// A stream of discovered broadcast sources. +/// This stream polls the scan results from GATT client to discover +/// available broadcast sources. +pub struct EventStream { + // Central scan result stream for finding broadcast sources. + scan_result_stream: Pin<Box<dyn Stream<Item = Result<ScanResult, bt_gatt::types::Error>>>>, + terminated: bool, + + broadcast_sources: Arc<DiscoveredBroadcastSources>, +} + +impl EventStream { + pub(crate) fn new( + scan_result_stream: impl Stream<Item = Result<ScanResult, bt_gatt::types::Error>> + 'static, + broadcast_sources: Arc<DiscoveredBroadcastSources>, + ) -> Self { + Self { + scan_result_stream: Box::pin(scan_result_stream), + terminated: false, + broadcast_sources, + } + } + + /// Returns the broadcast source if the scanned peer is a broadcast source. + /// Returns an error if parsing of the scan result data fails and None if + /// the scanned peer is not a broadcast source. + fn try_into_broadcast_source( + scan_result: &ScanResult, + ) -> Result<Option<BroadcastSource>, PacketError> { + let mut source = None; + for datum in &scan_result.advertised { + let AdvertisingDatum::ServiceData(uuid, data) = datum else { + continue; + }; + if *uuid == BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE { + let (bid, _) = BroadcastId::decode(data.as_slice())?; + source.get_or_insert(BroadcastSource::default()).with_broadcast_id(bid); + } else if *uuid == BASIC_AUDIO_ANNOUNCEMENT_SERVICE { + // TODO(dayeonglee): revisit when we implement periodic advertisement. + let (base, _) = BroadcastAudioSourceEndpoint::decode(data.as_slice())?; + source.get_or_insert(BroadcastSource::default()).with_endpoint(base); + } + } + Ok(source) + } +} + +impl FusedStream for EventStream { + fn is_terminated(&self) -> bool { + self.terminated + } +} + +impl Stream for EventStream { + type Item = Result<Event, Error>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Option<Self::Item>> { + if self.terminated { + return Poll::Ready(None); + } + + // Poll scan result stream to check if there were any newly discovered peers + // that we're interested in. + match futures::ready!(self.scan_result_stream.poll_next_unpin(cx)) { + Some(Ok(scanned)) => { + match Self::try_into_broadcast_source(&scanned) { + Err(e) => { + return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData { + peer: scanned.id, + error: e, + }))); + } + Ok(Some(found_source)) => { + // If we found a broadcast source, we add its information in the + // internal records. + let (broadcast_source, changed) = self + .broadcast_sources + .merge_broadcast_source_data(&scanned.id, &found_source); + + // Broadcast found event is relayed to the client iff complete + // information has been gathered. + if broadcast_source.into_add_source() && changed { + return Poll::Ready(Some(Ok(Event::FoundBroadcastSource { + peer: scanned.id, + source: broadcast_source, + }))); + } + + Poll::Pending + } + Ok(None) => Poll::Pending, + } + } + None | Some(Err(_)) => { + self.terminated = true; + Poll::Ready(Some(Err(Error::CentralScanTerminated))) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use assert_matches::assert_matches; + + use bt_common::core::{AddressType, AdvertisingSetId}; + use bt_gatt::central::{AdvertisingDatum, PeerName}; + use bt_gatt::test_utils::ScannedResultStream; + use bt_gatt::types::Error as BtGattError; + use bt_gatt::types::GattError; + + fn setup_stream() -> (EventStream, ScannedResultStream) { + let fake_scan_result_stream = ScannedResultStream::new(); + let broadcast_sources = DiscoveredBroadcastSources::new(); + + ( + EventStream::new(fake_scan_result_stream.clone(), broadcast_sources), + fake_scan_result_stream, + ) + } + + #[test] + fn poll_found_broadcast_source_events() { + let (mut stream, mut scan_result_stream) = setup_stream(); + + // Scanned a broadcast source and its broadcast id. + let broadcast_source_pid = PeerId(1005); + + scan_result_stream.set_scanned_result(Ok(ScanResult { + id: broadcast_source_pid, + connectable: true, + name: PeerName::Unknown, + advertised: vec![AdvertisingDatum::ServiceData( + BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE, + vec![0x01, 0x02, 0x03], + )], + })); + + // Found broadcast source event shouldn't have been sent since braodcast source + // information isn't complete. + let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); + assert!(stream.poll_next_unpin(&mut noop_cx).is_pending()); + + // Pretend somehow address, address type, and advertising sid were + // filled out. This completes the broadcast source information. + // TODO(b/308481381): replace this block with sending a central scan result that + // contains the data. + let _ = stream.broadcast_sources.merge_broadcast_source_data( + &broadcast_source_pid, + &BroadcastSource::default() + .with_address([1, 2, 3, 4, 5, 6]) + .with_address_type(AddressType::Public) + .with_advertising_sid(AdvertisingSetId(1)), + ); + + // Scanned broadcast source's BASE data. + // TODO(b/308481381): replace this block sending data through PA train instead. + #[rustfmt::skip] + let base_data = vec![ + 0x10, 0x20, 0x30, 0x02, // presentation delay, num of subgroups + 0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #1) + 0x00, // codec specific config len + 0x00, // metadata len, + 0x01, 0x00, // bis index, codec specific config len (big #1 / bis #1) + 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #2) + 0x00, // codec specific config len + 0x00, // metadata len, + 0x01, 0x03, 0x02, 0x05, + 0x08, /* bis index, codec specific config len, codec frame blocks LTV + * (big #2 / bis #2) */ + ]; + + scan_result_stream.set_scanned_result(Ok(ScanResult { + id: broadcast_source_pid, + connectable: true, + name: PeerName::Unknown, + advertised: vec![AdvertisingDatum::ServiceData( + BASIC_AUDIO_ANNOUNCEMENT_SERVICE, + base_data.clone(), + )], + })); + + // Expect the stream to send out broadcast source found event since information + // is complete. + let Poll::Ready(Some(Ok(event))) = stream.poll_next_unpin(&mut noop_cx) else { + panic!("should have received event"); + }; + assert_matches!(event, Event::FoundBroadcastSource{peer, ..} => { + assert_eq!(peer, broadcast_source_pid) + }); + + assert!(stream.poll_next_unpin(&mut noop_cx).is_pending()); + + // Scanned the same broadcast source's BASE data. + scan_result_stream.set_scanned_result(Ok(ScanResult { + id: broadcast_source_pid, + connectable: true, + name: PeerName::Unknown, + advertised: vec![AdvertisingDatum::ServiceData( + BASIC_AUDIO_ANNOUNCEMENT_SERVICE, + base_data.clone(), + )], + })); + + // Shouldn't have gotten the event again since the information remained the + // same. + assert!(stream.poll_next_unpin(&mut noop_cx).is_pending()); + } + + #[test] + fn central_scan_stream_terminates() { + let (mut stream, mut scan_result_stream) = setup_stream(); + + // Mimick scan error. + scan_result_stream.set_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu))); + + let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); + match stream.poll_next_unpin(&mut noop_cx) { + Poll::Ready(Some(Err(e))) => assert_matches!(e, Error::CentralScanTerminated), + _ => panic!("should have received central scan terminated error"), + } + + // Entire stream should have terminated. + assert_matches!(stream.poll_next_unpin(&mut noop_cx), Poll::Ready(None)); + assert_matches!(stream.is_terminated(), true); + } +}
diff --git a/rust/bt-broadcast-assistant/src/assistant/peer.rs b/rust/bt-broadcast-assistant/src/assistant/peer.rs new file mode 100644 index 0000000..3369a4d --- /dev/null +++ b/rust/bt-broadcast-assistant/src/assistant/peer.rs
@@ -0,0 +1,294 @@ +// Copyright 2023 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use futures::stream::FusedStream; +use futures::Stream; +use std::sync::Arc; +use thiserror::Error; + +use bt_bass::client::error::Error as BassClientError; +use bt_bass::client::event::Event as BassEvent; +use bt_bass::client::{BigToBisSync, BroadcastAudioScanServiceClient}; +use bt_bass::types::{BroadcastId, PaSync}; +use bt_common::core::PaInterval; +use bt_common::packet_encoding::Error as PacketError; +use bt_common::PeerId; + +use crate::assistant::DiscoveredBroadcastSources; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Failed to take event stream at Broadcast Audio Scan Service client")] + UnavailableBassEventStream, + + #[error("Broadcast Audio Scan Service client error: {0:?}")] + BassClient(#[from] BassClientError), + + #[error("Incomplete information for broadcast source with peer id ({0})")] + NotEnoughInfo(PeerId), + + #[error("Broadcast source with peer id ({0}) does not exist")] + DoesNotExist(PeerId), + + #[error("Packet error: {0}")] + PacketError(#[from] PacketError), +} + +/// Connected scan delegator peer. Clients can use this +/// object to perform Broadcast Audio Scan Service operations on the +/// scan delegator peer. +/// Not thread-safe and only one operation must be done at a time. +pub struct Peer<T: bt_gatt::GattTypes> { + peer_id: PeerId, + // Keep for peer connection. + _client: T::Client, + bass: BroadcastAudioScanServiceClient<T>, + // TODO(b/309015071): add a field for pacs. + broadcast_sources: Arc<DiscoveredBroadcastSources>, +} + +impl<T: bt_gatt::GattTypes> Peer<T> { + pub(crate) fn new( + peer_id: PeerId, + client: T::Client, + bass: BroadcastAudioScanServiceClient<T>, + broadcast_sources: Arc<DiscoveredBroadcastSources>, + ) -> Self { + Peer { peer_id, _client: client, bass, broadcast_sources } + } + + pub fn peer_id(&self) -> PeerId { + self.peer_id + } + + /// Takes event stream for BASS events from this scan delegator peer. + /// Clients can call this method to start subscribing to BASS events. + pub fn take_event_stream( + &mut self, + ) -> Result<impl Stream<Item = Result<BassEvent, BassClientError>> + FusedStream, Error> { + self.bass.take_event_stream().ok_or(Error::UnavailableBassEventStream) + } + + /// Send broadcast code for a particular broadcast. + pub async fn send_broadcast_code( + &mut self, + broadcast_id: BroadcastId, + broadcast_code: [u8; 16], + ) -> Result<(), Error> { + self.bass.set_broadcast_code(broadcast_id, broadcast_code).await.map_err(Into::into) + } + + /// Sends a command to add a particular broadcast source. + /// + /// # Arguments + /// + /// * `broadcast_source_pid` - peer id of the braodcast source that's to be + /// added to this scan delegator peer + /// * `pa_sync` - pa sync mode the peer should attempt to be in + /// * `bis_sync` - desired BIG to BIS synchronization information. If the + /// set is empty, no preference value is used for all the BIGs + pub async fn add_broadcast_source( + &mut self, + source_peer_id: PeerId, + pa_sync: PaSync, + bis_sync: BigToBisSync, + ) -> Result<(), Error> { + let broadcast_source = self + .broadcast_sources + .get_by_peer_id(&source_peer_id) + .ok_or(Error::DoesNotExist(source_peer_id))?; + if !broadcast_source.into_add_source() { + return Err(Error::NotEnoughInfo(source_peer_id)); + } + + self.bass + .add_broadcast_source( + broadcast_source.broadcast_id.unwrap(), + broadcast_source.address_type.unwrap(), + broadcast_source.address.unwrap(), + broadcast_source.advertising_sid.unwrap(), + pa_sync, + broadcast_source.pa_interval.unwrap_or(PaInterval::unknown()), + broadcast_source + .endpoint + .unwrap() + .get_bass_subgroups(bis_sync) + .map_err(Error::PacketError)?, + ) + .await + .map_err(Into::into) + } + + /// Sends a command to to update a particular broadcast source's PA sync. + /// + /// # Arguments + /// + /// * `broadcast_id` - broadcast id of the broadcast source that's to be + /// updated + /// * `pa_sync` - pa sync mode the scan delegator peer should attempt to be + /// in. + /// * `bis_sync` - desired BIG to BIS synchronization information + pub async fn update_broadcast_source_sync( + &mut self, + broadcast_id: BroadcastId, + pa_sync: PaSync, + bis_sync: BigToBisSync, + ) -> Result<(), Error> { + let pa_interval = self + .broadcast_sources + .get_by_broadcast_id(&broadcast_id) + .map(|bs| bs.pa_interval) + .unwrap_or(None); + + self.bass + .modify_broadcast_source(broadcast_id, pa_sync, pa_interval, Some(bis_sync), None) + .await + .map_err(Into::into) + } + + /// Sends a command to remove a particular broadcast source. + /// + /// # Arguments + /// + /// * `broadcast_id` - broadcast id of the braodcast source that's to be + /// removed from the scan delegator + pub async fn remove_broadcast_source( + &mut self, + broadcast_id: BroadcastId, + ) -> Result<(), Error> { + self.bass.remove_broadcast_source(broadcast_id).await.map_err(Into::into) + } + + /// Sends a command to inform the scan delegator peer that we have + /// started scanning for broadcast sources on behalf of it. + pub async fn inform_remote_scan_started(&mut self) -> Result<(), Error> { + self.bass.remote_scan_started().await.map_err(Into::into) + } + + /// Sends a command to inform the scan delegator peer that we have + /// stopped scanning for broadcast sources on behalf of it. + pub async fn inform_remote_scan_stopped(&mut self) -> Result<(), Error> { + self.bass.remote_scan_stopped().await.map_err(Into::into) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + + use assert_matches::assert_matches; + use futures::{pin_mut, FutureExt}; + use std::collections::HashSet; + use std::task::Poll; + + use bt_common::core::{AddressType, AdvertisingSetId}; + use bt_gatt::test_utils::{FakeClient, FakePeerService, FakeTypes}; + use bt_gatt::types::{ + AttributePermissions, CharacteristicProperties, CharacteristicProperty, Handle, + }; + use bt_gatt::Characteristic; + + use crate::types::BroadcastSource; + + const RECEIVE_STATE_HANDLE: Handle = Handle(0x11); + const AUDIO_SCAN_CONTROL_POINT_HANDLE: Handle = Handle(0x12); + + pub(crate) fn fake_bass_service() -> FakePeerService { + let mut peer_service = FakePeerService::new(); + // One broadcast receive state and one broadcast audio scan control + // point characteristic handles. + peer_service.add_characteristic( + Characteristic { + handle: RECEIVE_STATE_HANDLE, + uuid: bt_bass::types::BROADCAST_RECEIVE_STATE_UUID, + properties: CharacteristicProperties(vec![ + CharacteristicProperty::Broadcast, + CharacteristicProperty::Notify, + ]), + permissions: AttributePermissions::default(), + descriptors: vec![], + }, + vec![], + ); + peer_service.add_characteristic( + Characteristic { + handle: AUDIO_SCAN_CONTROL_POINT_HANDLE, + uuid: bt_bass::types::BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID, + properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]), + permissions: AttributePermissions::default(), + descriptors: vec![], + }, + vec![], + ); + peer_service + } + + fn setup() -> (Peer<FakeTypes>, FakePeerService, Arc<DiscoveredBroadcastSources>) { + let peer_service = fake_bass_service(); + + let broadcast_sources = DiscoveredBroadcastSources::new(); + ( + Peer { + peer_id: PeerId(0x1), + _client: FakeClient::new(), + bass: BroadcastAudioScanServiceClient::<FakeTypes>::create_for_test( + peer_service.clone(), + Handle(0x1), + ), + broadcast_sources: broadcast_sources.clone(), + }, + peer_service, + broadcast_sources, + ) + } + + #[test] + fn take_event_stream() { + let (mut peer, _peer_service, _broadcast_source) = setup(); + let _event_stream = peer.take_event_stream().expect("should succeed"); + + // If we try to take the event stream the second time, it should fail. + assert!(peer.take_event_stream().is_err()); + } + + #[test] + fn add_broadcast_source_fail() { + let (mut peer, _peer_service, broadcast_source) = setup(); + + let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); + + // Should fail because broadcast source doesn't exist. + { + let fut = peer.add_broadcast_source( + PeerId(1001), + PaSync::SyncPastUnavailable, + HashSet::new(), + ); + pin_mut!(fut); + let polled = fut.poll_unpin(&mut noop_cx); + assert_matches!(polled, Poll::Ready(Err(_))); + } + + let _ = broadcast_source.merge_broadcast_source_data( + &PeerId(1001), + &BroadcastSource::default() + .with_address([1, 2, 3, 4, 5, 6]) + .with_address_type(AddressType::Public) + .with_advertising_sid(AdvertisingSetId(1)) + .with_broadcast_id(BroadcastId(1001)), + ); + + // Should fail because not enough information. + { + let fut = peer.add_broadcast_source( + PeerId(1001), + PaSync::SyncPastUnavailable, + HashSet::new(), + ); + pin_mut!(fut); + let polled = fut.poll_unpin(&mut noop_cx); + assert_matches!(polled, Poll::Ready(Err(_))); + } + } +}
diff --git a/rust/bt-broadcast-assistant/src/lib.rs b/rust/bt-broadcast-assistant/src/lib.rs new file mode 100644 index 0000000..7d7af50 --- /dev/null +++ b/rust/bt-broadcast-assistant/src/lib.rs
@@ -0,0 +1,7 @@ +// Copyright 2023 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +pub mod assistant; +pub use assistant::BroadcastAssistant; +pub mod types;
diff --git a/rust/bt-broadcast-assistant/src/types.rs b/rust/bt-broadcast-assistant/src/types.rs new file mode 100644 index 0000000..8c6074e --- /dev/null +++ b/rust/bt-broadcast-assistant/src/types.rs
@@ -0,0 +1,435 @@ +// Copyright 2023 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use bt_bass::client::BigToBisSync; +use bt_bass::types::{BigSubgroup, BisSync, BroadcastId}; +use bt_common::core::ltv::LtValue; +use bt_common::core::{Address, AddressType, CodecId}; +use bt_common::core::{AdvertisingSetId, PaInterval}; +use bt_common::generic_audio::codec_configuration::CodecConfiguration; +use bt_common::generic_audio::metadata_ltv::Metadata; +use bt_common::packet_encoding::{Decodable, Error as PacketError}; + +/// Broadcast source data as advertised through Basic Audio Announcement +/// PA and Broadcast Audio Announcement. +/// See BAP spec v1.0.1 Section 3.7.2.1 and Section 3.7.2.2 for details. +// TODO(b/308481381): fill out endpoint from basic audio announcement from PA trains. +#[derive(Clone, Default, Debug, PartialEq)] +pub struct BroadcastSource { + pub(crate) address: Option<Address>, + pub(crate) address_type: Option<AddressType>, + pub(crate) advertising_sid: Option<AdvertisingSetId>, + pub(crate) broadcast_id: Option<BroadcastId>, + pub(crate) pa_interval: Option<PaInterval>, + pub(crate) endpoint: Option<BroadcastAudioSourceEndpoint>, +} + +impl BroadcastSource { + /// Returns whether or not this BroadcastSource has enough information + /// to be added by the Broadcast Assistant. + pub(crate) fn into_add_source(&self) -> bool { + // PA interval is not necessary since default value can be used. + self.address.is_some() + && self.address_type.is_some() + && self.advertising_sid.is_some() + && self.broadcast_id.is_some() + && self.endpoint.is_some() + } + + pub fn with_address(&mut self, address: [u8; 6]) -> &mut Self { + self.address = Some(address); + self + } + + pub fn with_address_type(&mut self, type_: AddressType) -> &mut Self { + self.address_type = Some(type_); + self + } + + pub fn with_broadcast_id(&mut self, bid: BroadcastId) -> &mut Self { + self.broadcast_id = Some(bid); + self + } + + pub fn with_advertising_sid(&mut self, sid: AdvertisingSetId) -> &mut Self { + self.advertising_sid = Some(sid); + self + } + + pub fn with_endpoint(&mut self, endpoint: BroadcastAudioSourceEndpoint) -> &mut Self { + self.endpoint = Some(endpoint); + self + } + + /// Merge fields from other broadcast source into this broadcast source. + /// Set fields in other source take priority over this source. + /// If a field in the other broadcast source is none, it's ignored and + /// the existing values are kept. + pub(crate) fn merge(&mut self, other: &BroadcastSource) { + if let Some(address) = other.address { + self.address = Some(address); + } + if let Some(address_type) = other.address_type { + self.address_type = Some(address_type); + } + if let Some(advertising_sid) = other.advertising_sid { + self.advertising_sid = Some(advertising_sid); + } + if let Some(broadcast_id) = other.broadcast_id { + self.broadcast_id = Some(broadcast_id); + } + if let Some(pa_interval) = other.pa_interval { + self.pa_interval = Some(pa_interval); + } + if let Some(endpoint) = &other.endpoint { + self.endpoint = Some(endpoint.clone()); + } + } +} + +/// Parameters exposed as part of Basic Audio Announcement from Broadcast +/// Sources. See BAP spec v1.0.1 Section 3.7.2.2 for more details. +// TODO(b/308481381): Fill out the struct. +#[derive(Clone, Debug, PartialEq)] +pub struct BroadcastAudioSourceEndpoint { + // Delay is 3 bytes. + pub(crate) presentation_delay_ms: u32, + pub(crate) big: Vec<BroadcastIsochronousGroup>, +} + +impl BroadcastAudioSourceEndpoint { + // Should contain presentation delay, num BIG, and at least one BIG praram. + const MIN_PACKET_SIZE: usize = 3 + 1 + BroadcastIsochronousGroup::MIN_PACKET_SIZE; + + /// Returns the representation of this object's broadcast isochronous groups + /// that's usable with Broadcast Audio Scan Service operations. + /// + /// # Arguments + /// + /// * `bis_sync` - BIG to BIS sync information. If the set is empty, no + /// preference value is used for all the BIGs + pub(crate) fn get_bass_subgroups( + &self, + bis_sync: BigToBisSync, + ) -> Result<Vec<BigSubgroup>, PacketError> { + let mut subgroups = Vec::new(); + let sync_map = bt_bass::client::big_to_bis_sync_indices(&bis_sync); + + for (big_index, group) in self.big.iter().enumerate() { + let bis_sync = match sync_map.get(&(big_index as u8)) { + Some(bis_indices) => { + let mut bis_sync: BisSync = BisSync(0); + bis_sync.set_sync(bis_indices)?; + bis_sync + } + _ => BisSync::default(), + }; + subgroups.push(BigSubgroup::new(Some(bis_sync)).with_metadata(group.metadata.clone())); + } + Ok(subgroups) + } +} + +impl Decodable for BroadcastAudioSourceEndpoint { + type Error = PacketError; + + fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> { + if buf.len() < Self::MIN_PACKET_SIZE { + return Err(PacketError::UnexpectedDataLength); + } + + let mut idx = 0 as usize; + let presentation_delay = u32::from_le_bytes([buf[idx], buf[idx + 1], buf[idx + 2], 0x00]); + idx += 3; + + let num_big: usize = buf[idx] as usize; + idx += 1; + if num_big < 1 { + return Err(PacketError::InvalidParameter(format!( + "num of subgroups shall be at least 1 got {num_big}" + ))); + } + + let mut big = Vec::new(); + while big.len() < num_big { + let (group, len) = BroadcastIsochronousGroup::decode(&buf[idx..]) + .map_err(|e| PacketError::InvalidParameter(format!("{e}")))?; + big.push(group); + idx += len; + } + + Ok((Self { presentation_delay_ms: presentation_delay, big }, idx)) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct BroadcastIsochronousGroup { + pub(crate) codec_id: CodecId, + pub(crate) codec_specific_config: Vec<CodecConfiguration>, + pub(crate) metadata: Vec<Metadata>, + pub(crate) bis: Vec<BroadcastIsochronousStream>, +} + +impl BroadcastIsochronousGroup { + // Should contain num BIS, codec id, codec specific config len, metadata len, + // and at least one BIS praram. + const MIN_PACKET_SIZE: usize = + 1 + CodecId::BYTE_SIZE + 1 + 1 + BroadcastIsochronousStream::MIN_PACKET_SIZE; +} + +impl Decodable for BroadcastIsochronousGroup { + type Error = PacketError; + + fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> { + if buf.len() < BroadcastIsochronousGroup::MIN_PACKET_SIZE { + return Err(PacketError::UnexpectedDataLength); + } + + let mut idx = 0; + let num_bis = buf[idx] as usize; + idx += 1; + if num_bis < 1 { + return Err(PacketError::InvalidParameter(format!( + "num of BIS shall be at least 1 got {num_bis}" + ))); + } + + let (codec_id, read_bytes) = CodecId::decode(&buf[idx..])?; + idx += read_bytes; + + let codec_config_len = buf[idx] as usize; + idx += 1; + + let (results, consumed) = CodecConfiguration::decode_all(&buf[idx..idx + codec_config_len]); + if consumed != codec_config_len { + return Err(bt_common::packet_encoding::Error::UnexpectedDataLength); + } + + let codec_specific_configs = results.into_iter().filter_map(Result::ok).collect(); + idx += codec_config_len; + + let metadata_len = buf[idx] as usize; + idx += 1; + + let (results_metadata, consumed_len) = Metadata::decode_all(&buf[idx..idx + metadata_len]); + if consumed_len != metadata_len { + return Err(PacketError::UnexpectedDataLength); + } + // Ignore any undecodable metadata types + let metadata = results_metadata.into_iter().filter_map(Result::ok).collect(); + idx += consumed_len; + + let mut bis = Vec::new(); + while bis.len() < num_bis { + let (stream, len) = BroadcastIsochronousStream::decode(&buf[idx..]) + .map_err(|e| PacketError::InvalidParameter(e.to_string()))?; + bis.push(stream); + idx += len; + } + + Ok(( + BroadcastIsochronousGroup { + codec_id, + codec_specific_config: codec_specific_configs, + metadata, + bis, + }, + idx, + )) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct BroadcastIsochronousStream { + pub(crate) bis_index: u8, + pub(crate) codec_specific_config: Vec<CodecConfiguration>, +} + +impl BroadcastIsochronousStream { + const MIN_PACKET_SIZE: usize = 1 + 1; +} + +impl Decodable for BroadcastIsochronousStream { + type Error = PacketError; + + fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> { + if buf.len() < BroadcastIsochronousStream::MIN_PACKET_SIZE { + return Err(PacketError::UnexpectedDataLength); + } + + let mut idx = 0; + + let bis_index = buf[idx]; + idx += 1; + + let codec_config_len = buf[idx] as usize; + idx += 1; + + let (results, consumed) = CodecConfiguration::decode_all(&buf[idx..idx + codec_config_len]); + if consumed != codec_config_len { + return Err(bt_common::packet_encoding::Error::UnexpectedDataLength); + } + let codec_specific_configs = results.into_iter().filter_map(Result::ok).collect(); + idx += codec_config_len; + + Ok(( + BroadcastIsochronousStream { bis_index, codec_specific_config: codec_specific_configs }, + idx, + )) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + + use bt_common::generic_audio::codec_configuration::{FrameDuration, SamplingFrequency}; + use bt_common::generic_audio::AudioLocation; + + #[test] + fn decode_bis() { + #[rustfmt::skip] + let buf = [ + 0x01, 0x09, // bis index and codec specific config len + 0x02, 0x01, 0x06, // sampling frequency LTV + 0x05, 0x03, 0x03, 0x00, 0x00, 0x0C, // audio location LTV + ]; + + let (bis, _read_bytes) = + BroadcastIsochronousStream::decode(&buf[..]).expect("should not fail"); + assert_eq!( + bis, + BroadcastIsochronousStream { + bis_index: 0x01, + codec_specific_config: vec![ + CodecConfiguration::SamplingFrequency(SamplingFrequency::F32000Hz), + CodecConfiguration::AudioChannelAllocation(HashSet::from([ + AudioLocation::FrontLeft, + AudioLocation::FrontRight, + AudioLocation::LeftSurround, + AudioLocation::RightSurround + ])), + ], + } + ); + } + + #[test] + fn decode_big() { + #[rustfmt::skip] + let buf = [ + 0x02, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id + 0x04, 0x03, 0x04, 0x04, 0x10, // codec specific config len, octets per codec frame LTV + 0x03, 0x02, 0x08, 0x01, // metadata len, audio active state LTV + 0x01, 0x00, // bis index, codec specific config len (bis #1) + 0x02, 0x03, 0x02, 0x02, 0x01, // bis index, codec specific config len, frame duration LTV (bis #2) + ]; + + let (big, _read_bytes) = + BroadcastIsochronousGroup::decode(&buf[..]).expect("should not fail"); + assert_eq!( + big, + BroadcastIsochronousGroup { + codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Transparent), + codec_specific_config: vec![CodecConfiguration::OctetsPerCodecFrame(0x1004),], + metadata: vec![Metadata::AudioActiveState(true)], + bis: vec![ + BroadcastIsochronousStream { bis_index: 0x01, codec_specific_config: vec![] }, + BroadcastIsochronousStream { + bis_index: 0x02, + codec_specific_config: vec![CodecConfiguration::FrameDuration( + FrameDuration::TenMs + )], + }, + ], + } + ); + } + + #[test] + fn decode_base() { + #[rustfmt::skip] + let buf = [ + 0x10, 0x20, 0x30, 0x02, // presentation delay, num of subgroups + 0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #1) + 0x00, // codec specific config len + 0x00, // metadata len, + 0x01, 0x00, // bis index, codec specific config len (big #1 / bis #1) + 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #2) + 0x00, // codec specific config len + 0x00, // metadata len, + 0x01, 0x03, 0x02, 0x05, 0x08, // bis index, codec specific config len, codec frame blocks LTV (big #2 / bis #2) + ]; + + let (base, _read_bytes) = + BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail"); + assert_eq!(base.presentation_delay_ms, 0x00302010); + assert_eq!(base.big.len(), 2); + assert_eq!( + base.big[0], + BroadcastIsochronousGroup { + codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Transparent), + codec_specific_config: vec![], + metadata: vec![], + bis: vec![BroadcastIsochronousStream { + bis_index: 0x01, + codec_specific_config: vec![], + },], + } + ); + assert_eq!( + base.big[1], + BroadcastIsochronousGroup { + codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Cvsd), + codec_specific_config: vec![], + metadata: vec![], + bis: vec![BroadcastIsochronousStream { + bis_index: 0x01, + codec_specific_config: vec![CodecConfiguration::CodecFramesPerSdu(0x08)], + },], + } + ); + } + + #[test] + fn decode_base_complex() { + // BroadcastAudioSourceEndpoint { presentation_delay_ms: 20000, big: + // [BroadcastIsochronousGroup { codec_id: Assigned(Lc3), codec_specific_config: + // [SamplingFrequency(F24000Hz), FrameDuration(TenMs), + // AudioChannelAllocation({FrontLeft, FrontRight}), OctetsPerCodecFrame(60)], + // metadata: [StreamingAudioContexts([Media])], bis: + // [BroadcastIsochronousStream { bis_index: 1, codec_specific_config: + // [AudioChannelAllocation({FrontLeft})] }, BroadcastIsochronousStream { + // bis_index: 2, codec_specific_con$ + // ig: [AudioChannelAllocation({FrontRight})] }] }] } + #[rustfmt::skip] + let buf = [ + 0x20, 0x4e, 0x00, 0x01, 0x02, 0x06, 0x00, 0x00, 0x00, 0x00, 0x10, 0x02, 0x01, 0x05, 0x02, 0x02, 0x01, 0x05, + 0x03, 0x03, 0x00, 0x00, 0x00, 0x03, 0x04, 0x3c, 0x00, 0x04, 0x03, 0x02, 0x04, 0x00, 0x01, 0x06, 0x05, 0x03, 0x01, + 0x00, 0x00, 0x00, 0x02, 0x06, 0x05, 0x03, 0x02, 0x00, 0x00, 0x00 + ]; + let (base, _read_bytes) = + BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail"); + println!("{base:?}"); + + // BroadcastAudioSourceEndpoint { presentation_delay_ms: 20000, big: + // [BroadcastIsochronousGroup { codec_id: Assigned(Lc3), codec_specific_config: + // [SamplingFrequency(F24000Hz), FrameDuration(TenMs), + // AudioChannelAllocation({FrontLeft}), OctetsPerCodecFrame(60)], + // metadata: [BroadcastAudioImmediateRenderingFlag], bis: + // [BroadcastIsochronousStream { bis_index: 1, codec_specific_config: + // [SamplingFrequency(F24000Hz)] }] }] } + #[rustfmt::skip] + let buf = [ + 0x20, 0x4e, 0x00, 0x01, 0x01, 0x06, 0x00, 0x00, 0x00, 0x00, 0x10, 0x02, 0x01, 0x05, + 0x02, 0x02, 0x01, 0x05, 0x03, 0x01, 0x00, 0x00, 0x00, 0x03, 0x04, 0x3c, 0x00, 0x02, + 0x01, 0x09, 0x01, 0x03, 0x02, 0x01, 0x05, + ]; + let (base, _read_bytes) = + BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail"); + println!("{base:?}"); + } +}
diff --git a/rust/bt-common/src/core.rs b/rust/bt-common/src/core.rs index fd02299..9ee3fa0 100644 --- a/rust/bt-common/src/core.rs +++ b/rust/bt-common/src/core.rs
@@ -7,8 +7,36 @@ use crate::packet_encoding::{Encodable, Error as PacketError}; -/// Represents the Advertising Set ID which is 1 byte long. -/// Follows little endian encoding. +/// Bluetooth Device Address that uniquely identifies the device +/// to another Bluetooth device. +/// See Core spec v5.3 Vol 2, Part B section 1.2. +pub type Address = [u8; 6]; + +/// See Core spec v5.3 Vol 3, Part C section 15.1.1. +#[repr(u8)] +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum AddressType { + Public = 0x00, + Random = 0x01, +} + +impl AddressType { + pub const BYTE_SIZE: usize = 1; +} + +impl TryFrom<u8> for AddressType { + type Error = PacketError; + + fn try_from(value: u8) -> Result<Self, Self::Error> { + match value { + 0x00 => Ok(Self::Public), + 0x01 => Ok(Self::Random), + _ => Err(PacketError::OutOfRange), + } + } +} + +/// Advertising Set ID which is 1 byte long. #[derive(Debug, Clone, Copy, PartialEq)] pub struct AdvertisingSetId(pub u8); @@ -17,8 +45,7 @@ pub const BYTE_SIZE: usize = 1; } -/// Represents the SyncInfo Interval value which is 2 bytes long. -/// Follows little endian encoding. +/// SyncInfo Interval value which is 2 bytes long. #[derive(Debug, Clone, Copy, PartialEq)] pub struct PaInterval(pub u16);
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs index 879a0fe..7a3578f 100644 --- a/rust/bt-gatt/src/test_utils.rs +++ b/rust/bt-gatt/src/test_utils.rs
@@ -12,7 +12,7 @@ use bt_common::{PeerId, Uuid}; -use crate::central::{AdvertisingDatum, PeerName, ScanResult}; +use crate::central::ScanResult; use crate::client::CharacteristicNotification; use crate::{types::*, GattTypes}; @@ -142,47 +142,97 @@ } } -pub struct FakeServiceHandle {} +#[derive(Clone)] +pub struct FakeServiceHandle { + pub uuid: Uuid, + pub is_primary: bool, + pub fake_service: FakePeerService, +} impl crate::client::PeerServiceHandle<FakeTypes> for FakeServiceHandle { fn uuid(&self) -> Uuid { - todo!() + self.uuid } fn is_primary(&self) -> bool { - todo!() + self.is_primary } fn connect(&self) -> <FakeTypes as GattTypes>::ServiceConnectFut { - futures::future::ready(Ok(FakePeerService::new())) + futures::future::ready(Ok(self.fake_service.clone())) } } -pub struct FakeClient {} +#[derive(Default)] +struct FakeClientInner { + fake_services: Vec<FakeServiceHandle>, +} + +#[derive(Clone)] +pub struct FakeClient { + inner: Arc<Mutex<FakeClientInner>>, +} + +impl FakeClient { + pub fn new() -> Self { + FakeClient { inner: Arc::new(Mutex::new(FakeClientInner::default())) } + } + + /// Add a fake peer service to this client. + pub fn add_service(&mut self, uuid: Uuid, is_primary: bool, fake_service: FakePeerService) { + self.inner.lock().fake_services.push(FakeServiceHandle { uuid, is_primary, fake_service }); + } +} impl crate::Client<FakeTypes> for FakeClient { fn peer_id(&self) -> PeerId { todo!() } - fn find_service(&self, _uuid: Uuid) -> <FakeTypes as GattTypes>::FindServicesFut { - futures::future::ready(Ok::<Vec<FakeServiceHandle>, Error>(vec![FakeServiceHandle {}])) + fn find_service(&self, uuid: Uuid) -> <FakeTypes as GattTypes>::FindServicesFut { + let fake_services = &self.inner.lock().fake_services; + let mut filtered_services = Vec::new(); + for handle in fake_services { + if handle.uuid == uuid { + filtered_services.push(handle.clone()); + } + } + + futures::future::ready(Ok(filtered_services)) } } -pub struct SingleResultStream { - result: Option<Result<crate::central::ScanResult>>, +#[derive(Clone)] +pub struct ScannedResultStream { + inner: Arc<Mutex<ScannedResultStreamInner>>, } -impl Stream for SingleResultStream { +#[derive(Default)] +pub struct ScannedResultStreamInner { + result: Option<Result<ScanResult>>, +} + +impl ScannedResultStream { + pub fn new() -> Self { + Self { inner: Arc::new(Mutex::new(ScannedResultStreamInner::default())) } + } + + /// Set scanned result item to output from the stream. + pub fn set_scanned_result(&mut self, item: Result<ScanResult>) { + self.inner.lock().result = Some(item); + } +} + +impl Stream for ScannedResultStream { type Item = Result<ScanResult>; fn poll_next( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> Poll<Option<Self::Item>> { - if self.result.is_some() { - Poll::Ready(self.get_mut().result.take()) + let mut lock = self.inner.lock(); + if lock.result.is_some() { + Poll::Ready(lock.result.take()) } else { // Never wake up, as if we never find another result Poll::Pending @@ -194,7 +244,7 @@ impl GattTypes for FakeTypes { type Central = FakeCentral; - type ScanResultStream = SingleResultStream; + type ScanResultStream = ScannedResultStream; type Client = FakeClient; type ConnectFuture = Ready<Result<FakeClient>>; type PeerServiceHandle = FakeServiceHandle; @@ -208,21 +258,36 @@ } #[derive(Default)] -pub struct FakeCentral {} +pub struct FakeCentralInner { + clients: HashMap<PeerId, FakeClient>, +} -impl crate::Central<FakeTypes> for FakeCentral { - fn scan(&self, _filters: &[crate::central::ScanFilter]) -> SingleResultStream { - SingleResultStream { - result: Some(Ok(ScanResult { - id: PeerId(1), - connectable: true, - name: PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()), - advertised: vec![AdvertisingDatum::Services(vec![Uuid::from_u16(0x1844)])], - })), - } +#[derive(Clone)] +pub struct FakeCentral { + inner: Arc<Mutex<FakeCentralInner>>, +} + +impl FakeCentral { + pub fn new() -> Self { + Self { inner: Arc::new(Mutex::new(FakeCentralInner::default())) } } - fn connect(&self, _peer_id: PeerId) -> <FakeTypes as GattTypes>::ConnectFuture { - futures::future::ready(Ok(FakeClient {})) + pub fn add_client(&mut self, peer_id: PeerId, client: FakeClient) { + let _ = self.inner.lock().clients.insert(peer_id, client); + } +} + +impl crate::Central<FakeTypes> for FakeCentral { + fn scan(&self, _filters: &[crate::central::ScanFilter]) -> ScannedResultStream { + ScannedResultStream::new() + } + + fn connect(&self, peer_id: PeerId) -> <FakeTypes as GattTypes>::ConnectFuture { + let clients = &self.inner.lock().clients; + let res = match clients.get(&peer_id) { + Some(client) => Ok(client.clone()), + None => Err(Error::PeerDisconnected(peer_id)), + }; + futures::future::ready(res) } }
diff --git a/rust/bt-gatt/src/tests.rs b/rust/bt-gatt/src/tests.rs index d300f0f..e753706 100644 --- a/rust/bt-gatt/src/tests.rs +++ b/rust/bt-gatt/src/tests.rs
@@ -9,6 +9,7 @@ use bt_common::{PeerId, Uuid}; +use crate::central::{AdvertisingDatum, PeerName, ScanResult}; use crate::test_utils::*; use crate::types::*; use crate::{central::Filter, client::PeerService, Central}; @@ -18,7 +19,7 @@ const TEST_UUID_3: Uuid = Uuid::from_u16(0x3456); // Sets up a fake peer service with some characteristics. -fn set_up() -> FakePeerService { +fn setup_peer_service() -> FakePeerService { let mut fake_peer_service = FakePeerService::new(); fake_peer_service.add_characteristic( Characteristic { @@ -86,7 +87,7 @@ fn peer_service_discover_characteristics_works() { let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); - let fake_peer_service = set_up(); + let fake_peer_service = setup_peer_service(); let mut discover_results = fake_peer_service.discover_characteristics(Some(TEST_UUID_1)); let polled = discover_results.poll_unpin(&mut noop_cx); @@ -110,7 +111,7 @@ fn peer_service_read_characteristic() { let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); - let mut fake_peer_service = set_up(); + let mut fake_peer_service = setup_peer_service(); let mut buf = vec![0; 255]; // For characteristic that was added, value is returned @@ -152,7 +153,7 @@ fn peer_service_write_characteristic() { let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); - let mut fake_peer_service: FakePeerService = set_up(); + let mut fake_peer_service: FakePeerService = setup_peer_service(); // Set expected characteristic value before calling `write_characteristic`. fake_peer_service.expect_characteristic_value(&Handle(1), vec![0, 1, 2, 3]); @@ -174,7 +175,7 @@ fn peer_service_write_characteristic_fail() { let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); - let fake_peer_service = set_up(); + let fake_peer_service = setup_peer_service(); // Write some random value without calling `expect_characteristic_value` first. let mut write_result = fake_peer_service.write_characteristic( @@ -194,7 +195,7 @@ fn peer_service_subsribe() { let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); - let mut fake_peer_service = set_up(); + let mut fake_peer_service = setup_peer_service(); let mut notification_stream = fake_peer_service.subscribe(&Handle(0x1)); // Stream is empty unless we add an item through the FakeNotificationStream @@ -230,9 +231,19 @@ #[test] fn central_search_works() { let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); - let central = FakeCentral::default(); + let central = FakeCentral::new(); let mut scan_results = central.scan(&[Filter::ServiceUuid(Uuid::from_u16(0x1844)).into()]); + let polled = scan_results.poll_next_unpin(&mut noop_cx); + assert_matches!(polled, Poll::Pending); + + let scanned_result = ScanResult { + id: PeerId(1), + connectable: true, + name: PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()), + advertised: vec![AdvertisingDatum::Services(vec![Uuid::from_u16(0x1844)])], + }; + let _ = scan_results.set_scanned_result(Ok(scanned_result)); let polled = scan_results.poll_next_unpin(&mut noop_cx); assert_matches!(polled, Poll::Ready(Some(Ok(_)))); @@ -243,8 +254,8 @@ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); let _stream = central.scan(&[]); - let connect_fut = central.connect(PeerId(1)); + let connect_fut = central.connect(PeerId(1)); futures::pin_mut!(connect_fut); let Poll::Ready(Ok(client)) = connect_fut.poll(&mut noop_cx) else { panic!("Connect should be ready Ok"); @@ -273,7 +284,12 @@ #[test] fn central_dynamic_usage() { - let central: Box<dyn crate::Central<FakeTypes>> = Box::new(FakeCentral::default()); + let mut central = FakeCentral::new(); + let mut fake_client = FakeClient::new(); + fake_client.add_service(Uuid::from_u16(0), true, FakePeerService::new()); + central.add_client(PeerId(1), fake_client); - boxed_generic_usage(central); + let boxed: Box<dyn crate::Central<FakeTypes>> = Box::new(central); + + boxed_generic_usage(boxed); }