bt/bt-broadcast-assistant: Support multi-broadcast and PA scanning - Implement PA scanning in EventStream using bt-gatt's PA traits to BASE endpoints - Refactor registry cache to key by (PeerId, AdvertisingSetId) to prevent collisions and natively support multi-broadcast sources. Bug: b/433285146 Change-Id: I7020bea1f25311347cbafc6ab2174d3e2e9b7e4a Reviewed-on: https://bluetooth-review.googlesource.com/c/bluetooth/+/2980
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs index fc86482..62531ac 100644 --- a/rust/bt-bass/src/client.rs +++ b/rust/bt-bass/src/client.rs
@@ -285,9 +285,14 @@ self.write_to_bascp(op).await } - /// Requests the BASS server to add or update Metadata for the Broadcast - /// Source, and/or to request the server to synchronize to, or to stop - /// synchronization to, a PA and/or a BIS. + /// Requests the Scan Delegator to modify a Broadcast Source's + /// synchronization state and/or metadata. + /// + /// This method writes a **Modify Source** operation to the Broadcast Audio + /// Scan Control Point. It reads the current state of the Broadcast + /// Source from the local cache (the BRS characteristic value), applies + /// the requested modifications, and sends the complete updated subgroup + /// list to the server. /// /// # Arguments /// @@ -308,48 +313,46 @@ bis_map: HashMap<SubgroupIndex, BisSync>, metadata_map: Option<HashMap<SubgroupIndex, Vec<Metadata>>>, ) -> Result<(), Error> { - let op = { - let mut state = self - .get_broadcast_source_state(&broadcast_id) - .ok_or(Error::UnknownBroadcastSource(broadcast_id))?; + let mut state = self + .get_broadcast_source_state(&broadcast_id) + .ok_or(Error::UnknownBroadcastSource(broadcast_id))?; - // Update BIS_Sync param for BIGs if applicable. + // Update BIS_Sync param for BIGs if applicable. + for (big_index, group) in state.subgroups.iter_mut().enumerate() { + if let Some(bis_sync) = bis_map.get(&(big_index as u8)) { + group.bis_sync = bis_sync.clone(); + } + } + + // Update metadata for BIGs if applicable. + if let Some(mut m) = metadata_map { for (big_index, group) in state.subgroups.iter_mut().enumerate() { - if let Some(bis_sync) = bis_map.get(&(big_index as u8)) { - group.bis_sync = bis_sync.clone(); + if let Some(metadata) = m.remove(&(big_index as u8)) { + group.metadata = metadata; } } - // Update metadata for BIGs if applicable. - if let Some(mut m) = metadata_map { - for (big_index, group) in state.subgroups.iter_mut().enumerate() { - if let Some(metadata) = m.remove(&(big_index as u8)) { - group.metadata = metadata; - } + // Left over metadata values are new subgroups that are to be added. New + // subgroups can only be added if the subgroup index is + // contiguous to existing subgroups. + let mut new_big_indices: Vec<&u8> = m.keys().collect(); + new_big_indices.sort(); + for big_index in new_big_indices { + if (*big_index as usize) != state.subgroups.len() { + warn!("cannot add new [{big_index}th] subgroup"); + break; } - - // Left over metadata values are new subgroups that are to be added. New - // subgroups can only be added if the subgroup index is - // contiguous to existing subgroups. - let mut new_big_indices: Vec<&u8> = m.keys().collect(); - new_big_indices.sort(); - for big_index in new_big_indices { - if (*big_index as usize) != state.subgroups.len() { - warn!("cannot add new [{big_index}th] subgroup"); - break; - } - let new_subgroup = BigSubgroup::new(None).with_metadata(m[big_index].clone()); - state.subgroups.push(new_subgroup); - } + let new_subgroup = BigSubgroup::new(None).with_metadata(m[big_index].clone()); + state.subgroups.push(new_subgroup); } + } - ModifySourceOperation::new( - state.source_id, - pa_sync, - pa_interval.unwrap_or(PeriodicAdvertisingInterval::unknown()), - state.subgroups, - ) - }; + let op = ModifySourceOperation::new( + state.source_id, + pa_sync, + pa_interval.unwrap_or(PeriodicAdvertisingInterval::unknown()), + state.subgroups, + ); self.write_to_bascp(op).await }
diff --git a/rust/bt-broadcast-assistant/Cargo.toml b/rust/bt-broadcast-assistant/Cargo.toml index b4d62fb..0876a54 100644 --- a/rust/bt-broadcast-assistant/Cargo.toml +++ b/rust/bt-broadcast-assistant/Cargo.toml
@@ -14,6 +14,7 @@ bt-common.workspace = true bt-gatt.workspace = true futures.workspace = true +log.workspace = true num.workspace = true parking_lot.workspace = true thiserror.workspace = true
diff --git a/rust/bt-broadcast-assistant/src/assistant.rs b/rust/bt-broadcast-assistant/src/assistant.rs index 495ed85..cde253a 100644 --- a/rust/bt-broadcast-assistant/src/assistant.rs +++ b/rust/bt-broadcast-assistant/src/assistant.rs
@@ -11,7 +11,7 @@ use bt_bap::types::BroadcastId; use bt_bass::client::error::Error as BassClientError; use bt_bass::client::BroadcastAudioScanServiceClient; -use bt_common::{PeerId, Uuid}; +use bt_common::{core::AdvertisingSetId, PeerId, Uuid}; use bt_gatt::central::*; use bt_gatt::client::PeerServiceHandle; use bt_gatt::types::Error as GattError; @@ -28,6 +28,8 @@ pub const BASIC_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1851); pub const BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1852); +pub type BroadcastSourceKey = (PeerId, AdvertisingSetId); + #[derive(Debug, Error)] pub enum Error { #[error("GATT operation error: {0:?}")] @@ -55,7 +57,7 @@ /// Contains information about the currently-known broadcast /// sources and the peers they were found on #[derive(Debug)] -pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<PeerId, BroadcastSource>>); +pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<BroadcastSourceKey, BroadcastSource>>); impl DiscoveredBroadcastSources { /// Creates a shareable instance of `DiscoveredBroadcastSources`. @@ -68,34 +70,36 @@ /// indicates whether it has changed from before or not. pub(crate) fn merge_broadcast_source_data( &self, - peer_id: &PeerId, + key: &BroadcastSourceKey, data: &BroadcastSource, ) -> (BroadcastSource, bool) { let mut lock = self.0.lock(); - let source = lock.entry(*peer_id).or_default(); + let source = lock.entry(*key).or_default(); let before = source.clone(); source.merge(data); + let after = source.clone(); let changed = before != after; (after, changed) } - /// Get a BroadcastSource from a peer id. - fn get_by_peer_id(&self, peer_id: &PeerId) -> Option<BroadcastSource> { + /// Get a BroadcastSource from a peer id and advertising SID. + pub(crate) fn get_by_key( + &self, + peer_id: PeerId, + advertising_sid: AdvertisingSetId, + ) -> Option<BroadcastSource> { let lock = self.0.lock(); - lock.get(&peer_id).clone().map(|source| source.clone()) + lock.get(&(peer_id, advertising_sid)).cloned() } /// Get a BroadcastSource from associated broadcast id. fn get_by_broadcast_id(&self, broadcast_id: &BroadcastId) -> Option<BroadcastSource> { let lock = self.0.lock(); - let info = lock.iter().find(|(&_k, &ref v)| v.broadcast_id == Some(*broadcast_id)); - match info { - Some((&_peer_id, &ref broadcast_source)) => Some(broadcast_source.clone()), - None => None, - } + let info = lock.values().find(|v| v.broadcast_id == Some(*broadcast_id)); + info.cloned() } } @@ -135,8 +139,23 @@ } let scan_result_stream = self.central.scan(&Self::scan_filters()); self.broadcast_source_scan_started.store(true, Ordering::Relaxed); + + let periodic_advertising = self + .central + .periodic_advertising() + .inspect_err(|e| { + // TODO(b/433285146): This should eventually fail the start operation. + // For now, log a warning and fallback to EA-only. + log::warn!( + "Periodic Advertising not supported by platform: {:?}. Falling back to EA-only scanning.", + e + ); + }) + .ok(); + Ok(EventStream::<T>::new( scan_result_stream, + periodic_advertising, self.broadcast_sources.clone(), self.broadcast_source_scan_started.clone(), )) @@ -194,13 +213,15 @@ let broadcast_source = BroadcastSource { address: Some(address), address_type: Some(address_type), - advertising_sid: Some(advertising_sid), broadcast_id: None, periodic_advertising_interval: None, endpoint: None, }; - Ok(self.broadcast_sources.merge_broadcast_source_data(&peer_id, &broadcast_source).0) + Ok(self + .broadcast_sources + .merge_broadcast_source_data(&(peer_id, advertising_sid), &broadcast_source) + .0) } // Manually adds broadcast source information for debugging purposes. @@ -208,6 +229,7 @@ pub fn force_discover_broadcast_source_metadata( &self, peer_id: PeerId, + advertising_sid: bt_common::core::AdvertisingSetId, big_metadata: Vec<Vec<bt_common::generic_audio::metadata_ltv::Metadata>>, ) -> Result<BroadcastSource, Error> { use bt_bap::types::{BroadcastAudioSourceEndpoint, BroadcastIsochronousGroup}; @@ -229,24 +251,23 @@ let broadcast_source = BroadcastSource { address: None, address_type: None, - advertising_sid: None, broadcast_id: None, periodic_advertising_interval: None, endpoint: Some(endpoint), }; - Ok(self.broadcast_sources.merge_broadcast_source_data(&peer_id, &broadcast_source).0) + Ok(self + .broadcast_sources + .merge_broadcast_source_data(&(peer_id, advertising_sid), &broadcast_source) + .0) } // Gets the broadcast sources currently known by the broadcast // assistant. - pub fn known_broadcast_sources(&self) -> std::collections::HashMap<PeerId, BroadcastSource> { - let lock = self.broadcast_sources.0.lock(); - let mut m = HashMap::new(); - for (pid, source) in lock.iter() { - m.insert(*pid, source.clone()); - } - m + pub fn known_broadcast_sources( + &self, + ) -> std::collections::HashMap<BroadcastSourceKey, BroadcastSource> { + self.broadcast_sources.0.lock().clone() } } @@ -258,7 +279,7 @@ use std::task::Poll; use bt_bap::types::*; - use bt_common::core::{AddressType, AdvertisingSetId}; + use bt_common::core::{AddressType, AdvertisingSetId, PeriodicAdvertisingInterval}; use bt_common::generic_audio::metadata_ltv::Metadata; use bt_gatt::test_utils::{FakeCentral, FakeClient, FakeTypes}; @@ -267,14 +288,16 @@ #[test] fn merge_broadcast_source() { let discovered = DiscoveredBroadcastSources::new(); - let bid = BroadcastId::try_from(1001).unwrap(); + let bid1 = BroadcastId::try_from(1001).unwrap(); + let key1 = (PeerId(1001), AdvertisingSetId(1)); + + // 1. Merge initial source data for SID 1. let (bs, changed) = discovered.merge_broadcast_source_data( - &PeerId(1001), + &key1, &BroadcastSource::default() .with_address([1, 2, 3, 4, 5, 6]) .with_address_type(AddressType::Public) - .with_advertising_sid(AdvertisingSetId(1)) - .with_broadcast_id(bid), + .with_broadcast_id(bid1), ); assert!(changed); assert_eq!( @@ -282,18 +305,22 @@ BroadcastSource { address: Some([1, 2, 3, 4, 5, 6]), address_type: Some(AddressType::Public), - advertising_sid: Some(AdvertisingSetId(1)), - broadcast_id: Some(bid), + broadcast_id: Some(bid1), periodic_advertising_interval: None, endpoint: None, } ); + // 2. Merge endpoint and PA interval for SID 1. let (bs, changed) = discovered.merge_broadcast_source_data( - &PeerId(1001), - &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint( - BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] }, - ), + &key1, + &BroadcastSource::default() + .with_address_type(AddressType::Random) + .with_endpoint(BroadcastAudioSourceEndpoint { + presentation_delay_ms: 32, + big: vec![], + }) + .with_periodic_advertising_interval(PeriodicAdvertisingInterval(0x0100)), ); assert!(changed); assert_eq!( @@ -301,9 +328,8 @@ BroadcastSource { address: Some([1, 2, 3, 4, 5, 6]), address_type: Some(AddressType::Random), - advertising_sid: Some(AdvertisingSetId(1)), - broadcast_id: Some(bid), - periodic_advertising_interval: None, + broadcast_id: Some(bid1), + periodic_advertising_interval: Some(PeriodicAdvertisingInterval(0x0100)), endpoint: Some(BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] @@ -311,13 +337,48 @@ } ); + // 3. Merge duplicate endpoint for SID 1 (no change). let (_, changed) = discovered.merge_broadcast_source_data( - &PeerId(1001), + &key1, &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint( BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] }, ), ); assert!(!changed); + + // 4. Merge a new broadcast source with a different SID (SID 2) for the same + // peer. + let bid2 = BroadcastId::try_from(1002).unwrap(); + let key2 = (PeerId(1001), AdvertisingSetId(2)); + let (bs, changed) = discovered.merge_broadcast_source_data( + &key2, + &BroadcastSource::default() + .with_address([1, 2, 3, 4, 5, 6]) + .with_address_type(AddressType::Public) + .with_broadcast_id(bid2), + ); + assert!(changed); + assert_eq!( + bs, + BroadcastSource { + address: Some([1, 2, 3, 4, 5, 6]), + address_type: Some(AddressType::Public), + broadcast_id: Some(bid2), + periodic_advertising_interval: None, + endpoint: None, + } + ); + + // Verify both entries coexist in the registry + assert!(discovered.get_by_key(key1.0, key1.1).is_some()); + assert!(discovered.get_by_key(key2.0, key2.1).is_some()); + + // Verify get_by_broadcast_id works for both and maps to correct keys + let lock = discovered.0.lock(); + let entry1 = lock.iter().find(|(_, v)| v.broadcast_id == Some(bid1)).unwrap(); + assert_eq!(entry1.0 .1, AdvertisingSetId(1)); + let entry2 = lock.iter().find(|(_, v)| v.broadcast_id == Some(bid2)).unwrap(); + assert_eq!(entry2.0 .1, AdvertisingSetId(2)); } #[test] @@ -368,7 +429,10 @@ assert_eq!(source.address, Some(address)); assert_eq!(source.address_type, Some(address_type)); - assert_eq!(source.advertising_sid, Some(sid)); + + // Verify it is registered in the assistant under the correct key + let known = assistant.known_broadcast_sources(); + assert_eq!(known.get(&(peer_id, sid)).unwrap().address, Some(address)); } #[test] @@ -376,12 +440,18 @@ let assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new()); let peer_id = PeerId(1); let metadata = vec![vec![Metadata::BroadcastAudioImmediateRenderingFlag]]; + let sid = AdvertisingSetId(1); - let source = - assistant.force_discover_broadcast_source_metadata(peer_id, metadata.clone()).unwrap(); + let source = assistant + .force_discover_broadcast_source_metadata(peer_id, sid, metadata.clone()) + .unwrap(); let endpoint = source.endpoint.unwrap(); assert_eq!(endpoint.big.len(), 1); assert_eq!(endpoint.big[0].metadata, metadata[0]); + + // Verify it is registered in the assistant under the correct key + let known = assistant.known_broadcast_sources(); + assert!(known.contains_key(&(peer_id, sid))); } }
diff --git a/rust/bt-broadcast-assistant/src/assistant/event.rs b/rust/bt-broadcast-assistant/src/assistant/event.rs index 18a97ae..40c32bc 100644 --- a/rust/bt-broadcast-assistant/src/assistant/event.rs +++ b/rust/bt-broadcast-assistant/src/assistant/event.rs
@@ -3,7 +3,12 @@ // found in the LICENSE file. use core::pin::Pin; -use futures::stream::{FusedStream, Stream, StreamExt}; +use futures::stream::{ + abortable, AbortHandle, FusedStream, FuturesUnordered, SelectAll, Stream, StreamExt, +}; +use futures::FutureExt; +use std::collections::HashMap; +use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::Poll; @@ -14,6 +19,8 @@ use bt_common::packet_encoding::Error as PacketError; use bt_common::PeerId; use bt_gatt::central::{AdvertisingDatum, ScanResult}; +use bt_gatt::periodic_advertising::{PeriodicAdvertising, SyncConfiguration, SyncReport}; +use bt_gatt::GattTypes; use crate::assistant::{ DiscoveredBroadcastSources, Error, BASIC_AUDIO_ANNOUNCEMENT_SERVICE, @@ -23,10 +30,28 @@ #[derive(Debug)] pub enum Event { - FoundBroadcastSource { peer: PeerId, source: BroadcastSource }, - CouldNotParseAdvertisingData { peer: PeerId, error: PacketError }, + FoundBroadcastSource { + peer: PeerId, + advertising_sid: AdvertisingSetId, + source: BroadcastSource, + }, + CouldNotParseAdvertisingData { + peer: PeerId, + error: PacketError, + }, } +type PeriodicAdvertisingSyncResult<T> = Result< + <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream, + bt_gatt::types::Error, +>; + +type PeriodicAdvertisingFuture<T> = + Pin<Box<dyn Future<Output = (PeerId, u8, PeriodicAdvertisingSyncResult<T>)>>>; + +type PeriodicAdvertisingStream = + Pin<Box<dyn Stream<Item = (PeerId, u8, Result<SyncReport, bt_gatt::types::Error>)>>>; + /// A stream of discovered broadcast sources. /// This stream polls the scan results from GATT client to discover /// available broadcast sources. @@ -36,11 +61,24 @@ broadcast_sources: Arc<DiscoveredBroadcastSources>, broadcast_source_scan_started: Arc<AtomicBool>, + + periodic_advertising: Option<T::PeriodicAdvertising>, + + establishing_periodic_advertising_syncs: FuturesUnordered<PeriodicAdvertisingFuture<T>>, + active_periodic_advertising_sync_streams: SelectAll<PeriodicAdvertisingStream>, + active_syncs: HashMap<(PeerId, u8), Option<AbortHandle>>, } -impl<T: bt_gatt::GattTypes> EventStream<T> { +impl<T: bt_gatt::GattTypes> Unpin for EventStream<T> {} + +impl<T: bt_gatt::GattTypes> EventStream<T> +where + <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream: 'static, + <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncFut: 'static, +{ pub(crate) fn new( scan_result_stream: T::ScanResultStream, + periodic_advertising: Option<T::PeriodicAdvertising>, broadcast_sources: Arc<DiscoveredBroadcastSources>, broadcast_source_scan_started: Arc<AtomicBool>, ) -> Self { @@ -49,6 +87,62 @@ terminated: false, broadcast_sources, broadcast_source_scan_started, + periodic_advertising, + establishing_periodic_advertising_syncs: FuturesUnordered::new(), + active_periodic_advertising_sync_streams: SelectAll::new(), + active_syncs: HashMap::new(), + } + } + + /// Polls the futures currently establishing periodic advertising syncs. + /// + /// Returns `Poll::Ready(())` if any future resolved (successfully + /// establishing a sync or failing), which indicates progress was made. + /// Returns `Poll::Pending` if no progress was made. + fn poll_establishing_syncs(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { + if self.establishing_periodic_advertising_syncs.is_terminated() { + return Poll::Pending; + } + + match self.establishing_periodic_advertising_syncs.poll_next_unpin(cx) { + Poll::Ready(Some((peer_id, sid, Ok(stream)))) => { + self.handle_established_sync(peer_id, sid, stream); + Poll::Ready(()) + } + Poll::Ready(Some((peer_id, sid, Err(_)))) => { + self.active_syncs.remove(&(peer_id, sid)); + Poll::Ready(()) + } + Poll::Ready(None) | Poll::Pending => Poll::Pending, + } + } + + /// Polls the active periodic advertising sync report streams. + /// + /// Returns: + /// - `Poll::Ready(Some(event))` if progress was made and a completed + /// `FoundBroadcastSource` event is ready to be returned. + /// - `Poll::Ready(None)` if progress was made (e.g., a report was processed + /// but was incomplete, or a stream failed and was removed), but no event + /// is ready yet. + /// - `Poll::Pending` if no progress was made. + fn poll_active_syncs(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Option<Event>> { + if self.active_periodic_advertising_sync_streams.is_terminated() { + return Poll::Pending; + } + + match self.active_periodic_advertising_sync_streams.poll_next_unpin(cx) { + Poll::Ready(Some((peer_id, sid, Ok(report)))) => { + match self.handle_periodic_advertising_report(peer_id, sid, report) { + Some(event) => Poll::Ready(Some(event)), + None => Poll::Ready(None), // Progressed, but no event + } + } + Poll::Ready(Some((peer_id, sid, Err(_)))) => { + self.active_syncs.remove(&(peer_id, sid)); + Poll::Ready(None) // Progressed, but no event + } + Poll::Ready(None) | Poll::Pending => Poll::Pending, } } @@ -66,19 +160,134 @@ if *uuid == BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE { let bid = BroadcastId::decode(data.as_slice()).0?; source.get_or_insert(BroadcastSource::default()).with_broadcast_id(bid); - } else if *uuid == BASIC_AUDIO_ANNOUNCEMENT_SERVICE { - // TODO(dayeonglee): revisit when we implement periodic advertisement. - let base = BroadcastAudioSourceEndpoint::decode(data.as_slice()).0?; - source.get_or_insert(BroadcastSource::default()).with_endpoint(base); } } if let Some(src) = &mut source { - src.advertising_sid = scan_result.advertising_sid.map(AdvertisingSetId); src.periodic_advertising_interval = scan_result.periodic_advertising_interval.map(PeriodicAdvertisingInterval); } Ok(source) } + + fn handle_established_sync( + &mut self, + peer_id: PeerId, + sid: u8, + stream: <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream, + ) { + let mapped_stream = stream.map(move |report| (peer_id, sid, report)); + let (abortable_stream, abort_handle) = abortable(mapped_stream); + + self.active_periodic_advertising_sync_streams.push(Box::pin(abortable_stream)); + self.active_syncs.insert((peer_id, sid), Some(abort_handle)); + } + + fn handle_periodic_advertising_report( + &mut self, + peer_id: PeerId, + sid: u8, + report: SyncReport, + ) -> Option<Event> { + let SyncReport::PeriodicAdvertisingReport(report) = report else { + return None; + }; + + let Some(base) = parse_base_from_advertising_data(&report.data) else { + return None; + }; + + let (broadcast_source, changed) = self.broadcast_sources.merge_broadcast_source_data( + &(peer_id, AdvertisingSetId(sid)), + &BroadcastSource::default().with_endpoint(base), + ); + + if broadcast_source.is_ready_to_add() && changed { + if let Some(Some(handle)) = self.active_syncs.remove(&(peer_id, sid)) { + handle.abort(); + } + return Some(Event::FoundBroadcastSource { + peer: peer_id, + advertising_sid: AdvertisingSetId(sid), + source: broadcast_source, + }); + } + None + } + + fn handle_scan_result(&mut self, scanned: ScanResult) -> Option<Event> { + let found_source = match Self::try_into_broadcast_source(&scanned) { + Ok(Some(src)) => src, + Ok(None) => return None, + Err(e) => { + return Some(Event::CouldNotParseAdvertisingData { peer: scanned.id, error: e }) + } + }; + + let Some(raw_sid) = scanned.advertising_sid else { + return None; + }; + let sid = AdvertisingSetId(raw_sid); + + let (broadcast_source, changed) = + self.broadcast_sources.merge_broadcast_source_data(&(scanned.id, sid), &found_source); + + if broadcast_source.is_ready_to_add() && changed { + return Some(Event::FoundBroadcastSource { + peer: scanned.id, + advertising_sid: sid, + source: broadcast_source, + }); + } + + // See if it's appropriate to establish PA sync for this broadcast source. + + // If we already have the endpoint data (BASE), we don't need to establish a + // sync. + if broadcast_source.endpoint.is_some() { + return None; + } + + // If the platform doesn't support periodic advertising, we cannot sync. + let Some(ref pa) = self.periodic_advertising else { + return None; + }; + + // If we are already actively syncing (or establishing a sync) for this + // peer/SID, don't start another one. + let key = (scanned.id, sid.0); + if self.active_syncs.contains_key(&key) { + return None; + } + + self.active_syncs.insert(key, None); + let fut = pa.sync_to_advertising_reports( + scanned.id, + sid.0, + SyncConfiguration { filter_duplicates: true }, + ); + let mapped_fut = fut.map(move |res| (scanned.id, sid.0, res)); + self.establishing_periodic_advertising_syncs.push(Box::pin(mapped_fut)); + + None + } +} + +fn parse_base_from_advertising_data( + data: &[AdvertisingDatum], +) -> Option<BroadcastAudioSourceEndpoint> { + for datum in data { + let AdvertisingDatum::ServiceData(uuid, service_data) = datum else { + continue; + }; + if *uuid != BASIC_AUDIO_ANNOUNCEMENT_SERVICE { + continue; + } + let (Ok(base), _) = BroadcastAudioSourceEndpoint::decode(service_data) else { + continue; + }; + return Some(base); + } + None } impl<T: bt_gatt::GattTypes> Drop for EventStream<T> { @@ -87,13 +296,21 @@ } } -impl<T: bt_gatt::GattTypes> FusedStream for EventStream<T> { +impl<T: bt_gatt::GattTypes> FusedStream for EventStream<T> +where + <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream: 'static, + <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncFut: 'static, +{ fn is_terminated(&self) -> bool { self.terminated } } -impl<T: bt_gatt::GattTypes> Stream for EventStream<T> { +impl<T: bt_gatt::GattTypes> Stream for EventStream<T> +where + <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream: 'static, + <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncFut: 'static, +{ type Item = Result<Event, Error>; fn poll_next( @@ -104,41 +321,40 @@ return Poll::Ready(None); } - // Poll scan result stream to check if there were any newly discovered peers - // that we're interested in. loop { - let Some(Ok(scanned)) = futures::ready!(self.scan_result_stream.poll_next_unpin(cx)) - else { - self.terminated = true; - self.broadcast_source_scan_started.store(false, Ordering::Relaxed); - return Poll::Ready(Some(Err(Error::CentralScanTerminated))); - }; + let mut progressed = false; - let found_source = match Self::try_into_broadcast_source(&scanned) { - Err(error) => { - return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData { - peer: scanned.id, - error, - }))); + if self.poll_establishing_syncs(cx).is_ready() { + progressed = true; + } + + match self.poll_active_syncs(cx) { + Poll::Ready(Some(event)) => return Poll::Ready(Some(Ok(event))), + Poll::Ready(None) => progressed = true, + Poll::Pending => {} + } + + match self.scan_result_stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(scanned))) => { + progressed = true; + if let Some(event) = self.handle_scan_result(scanned) { + return Poll::Ready(Some(Ok(event))); + } } - Ok(None) => continue, - Ok(Some(source)) => source, - }; + Poll::Ready(None | Some(Err(_))) => { + self.terminated = true; + self.broadcast_source_scan_started.store(false, Ordering::Relaxed); + return Poll::Ready(Some(Err(Error::CentralScanTerminated))); + } + Poll::Pending => {} + } - // If we found a broadcast source, we add its information in the - // internal records. - let (broadcast_source, changed) = - self.broadcast_sources.merge_broadcast_source_data(&scanned.id, &found_source); - - // Broadcast found event is relayed to the client iff complete - // information has been gathered. - if broadcast_source.into_add_source() && changed { - return Poll::Ready(Some(Ok(Event::FoundBroadcastSource { - peer: scanned.id, - source: broadcast_source, - }))); + if !progressed { + break; } } + + Poll::Pending } } @@ -150,29 +366,35 @@ use bt_common::core::{AddressType, AdvertisingSetId}; use bt_gatt::central::{AdvertisingDatum, PeerName}; - use bt_gatt::test_utils::{FakeTypes, ScannedResultStream, ScannedResultStreamController}; + use bt_gatt::test_utils::{ + FakePeriodicAdvertising, FakeTypes, ScannedResultStream, ScannedResultStreamController, + }; use bt_gatt::types::Error as BtGattError; use bt_gatt::types::GattError; - fn setup_stream() -> (EventStream<FakeTypes>, ScannedResultStreamController) { + fn setup_stream( + ) -> (EventStream<FakeTypes>, ScannedResultStreamController, FakePeriodicAdvertising) { let fake_scan_result_stream = ScannedResultStream::new(); let controller = fake_scan_result_stream.controller(); let broadcast_sources = DiscoveredBroadcastSources::new(); let broadcast_source_scan_started = Arc::new(AtomicBool::new(false)); + let pa = FakePeriodicAdvertising::default(); ( EventStream::<FakeTypes>::new( fake_scan_result_stream, + Some(pa.clone()), broadcast_sources, broadcast_source_scan_started, ), controller, + pa, ) } #[test] fn poll_found_broadcast_source_events() { - let (mut stream, scan_result_controller) = setup_stream(); + let (mut stream, scan_result_controller, pa) = setup_stream(); // Scanned a broadcast source and its broadcast id. let broadcast_source_pid = PeerId(1005); @@ -185,8 +407,8 @@ BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE, vec![0x01, 0x02, 0x03], )], - advertising_sid: Some(0), - periodic_advertising_interval: None, + advertising_sid: Some(1), + periodic_advertising_interval: Some(0x0100), })); // Found broadcast source event shouldn't have been sent since braodcast source @@ -194,81 +416,76 @@ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); assert!(stream.poll_next_unpin(&mut noop_cx).is_pending()); - // Pretend somehow address, address type, and advertising sid were - // filled out. This completes the broadcast source information. - // TODO(b/308481381): replace this block with sending a central scan result that - // contains the data. + // Poll again to let the PA sync transition from Establishing to Established. + assert!(stream.poll_next_unpin(&mut noop_cx).is_pending()); + + // Pretend somehow address, address type were filled out. let _ = stream.broadcast_sources.merge_broadcast_source_data( - &broadcast_source_pid, + &(broadcast_source_pid, AdvertisingSetId(1)), &BroadcastSource::default() .with_address([1, 2, 3, 4, 5, 6]) - .with_address_type(AddressType::Public) - .with_advertising_sid(AdvertisingSetId(1)), + .with_address_type(AddressType::Public), ); - // Scanned broadcast source's BASE data. - // TODO(b/308481381): replace this block sending data through PA train instead. + // Scanned broadcast source's BASE data: #[rustfmt::skip] let base_data = vec![ - 0x10, 0x20, 0x30, 0x02, // presentation delay, num of subgroups - 0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #1) - 0x00, // codec specific config len - 0x00, // metadata len, - 0x01, 0x00, // bis index, codec specific config len (big #1 / bis #1) - 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #2) - 0x00, // codec specific config len - 0x00, // metadata len, - 0x01, 0x03, 0x02, 0x05, - 0x08, /* bis index, codec specific config len, codec frame blocks LTV - * (big #2 / bis #2) */ + 0x10, 0x20, 0x30, 0x02, // presentation delay, num of subgroups + 0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #1) + 0x00, // codec specific config len + 0x00, // metadata len, + 0x01, 0x00, // bis index, codec specific config len (big #1 / bis #1) + 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #2) + 0x00, // codec specific config len + 0x00, // metadata len, + 0x01, 0x03, 0x02, 0x05, 0x08, // bis index, codec specific config len, codec frame blocks LTV (big #2 / bis #2) ]; - scan_result_controller.add_scanned_result(Ok(ScanResult { - id: broadcast_source_pid, - connectable: true, - name: PeerName::Unknown, - advertised: vec![AdvertisingDatum::ServiceData( - BASIC_AUDIO_ANNOUNCEMENT_SERVICE, - base_data.clone(), - )], - advertising_sid: Some(1), - periodic_advertising_interval: Some(0x0100), - })); + let advertising_data = + vec![AdvertisingDatum::ServiceData(BASIC_AUDIO_ANNOUNCEMENT_SERVICE, base_data)]; + + // Get the fake periodic advertising sync sender that was registered when we + // polled the scan result + let periodic_advertising_sender = pa + .get_sender(broadcast_source_pid) + .expect("should have registered periodic advertising sync sender"); + + // Send the PA report containing the BASE data + periodic_advertising_sender + .unbounded_send(Ok(SyncReport::PeriodicAdvertisingReport( + bt_gatt::periodic_advertising::PeriodicAdvertisingReport { + rssi: -50, + data: advertising_data, + event_counter: None, + subevent: None, + timestamp: 0, + }, + ))) + .unwrap(); // Expect the stream to send out broadcast source found event since information // is complete. let Poll::Ready(Some(Ok(event))) = stream.poll_next_unpin(&mut noop_cx) else { panic!("should have received event"); }; - assert_matches!(event, Event::FoundBroadcastSource{peer, source} => { + assert_matches!(event, Event::FoundBroadcastSource { peer, advertising_sid, source } => { assert_eq!(peer, broadcast_source_pid); - assert_eq!(source.advertising_sid, Some(AdvertisingSetId(1))); + assert_eq!(advertising_sid, AdvertisingSetId(1)); assert_eq!(source.periodic_advertising_interval, Some(PeriodicAdvertisingInterval(0x0100))); + assert_eq!(source.address, Some([1, 2, 3, 4, 5, 6])); }); - assert!(stream.poll_next_unpin(&mut noop_cx).is_pending()); + // Verify that the PA sync was stopped (removed from active_syncs) to conserve + // resources + assert!(stream.active_syncs.is_empty()); - // Scanned the same broadcast source's BASE data. - scan_result_controller.add_scanned_result(Ok(ScanResult { - id: broadcast_source_pid, - connectable: true, - name: PeerName::Unknown, - advertised: vec![AdvertisingDatum::ServiceData( - BASIC_AUDIO_ANNOUNCEMENT_SERVICE, - base_data.clone(), - )], - advertising_sid: Some(1), - periodic_advertising_interval: Some(0x0100), - })); - - // Shouldn't have gotten the event again since the information remained the - // same. + // Subsequent polls should be pending assert!(stream.poll_next_unpin(&mut noop_cx).is_pending()); } #[test] fn central_scan_stream_terminates() { - let (mut stream, scan_result_controller) = setup_stream(); + let (mut stream, scan_result_controller, _pa) = setup_stream(); // Mimick scan error. scan_result_controller.add_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu))); @@ -283,68 +500,4 @@ assert_matches!(stream.poll_next_unpin(&mut noop_cx), Poll::Ready(None)); assert_matches!(stream.is_terminated(), true); } - - #[test] - fn poll_processes_multiple_results_eagerly() { - let (mut stream, scan_result_controller) = setup_stream(); - let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref()); - - let broadcast_source_pid = PeerId(1005); - - #[rustfmt::skip] - let base_data = vec![ - 0x10, 0x20, 0x30, 0x01, // presentation delay, num of subgroups - 0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id - 0x00, // codec specific config len - 0x00, // metadata len, - 0x01, 0x00, // bis index, codec specific config len - ]; - - // 1. An irrelevant peer that should be ignored. - scan_result_controller.add_scanned_result(Ok(ScanResult { - id: PeerId(1), - connectable: true, - name: PeerName::Unknown, - advertised: vec![], - advertising_sid: Some(0), - periodic_advertising_interval: None, - })); - // 2. A broadcast source, but incomplete data (only broadcast ID). - scan_result_controller.add_scanned_result(Ok(ScanResult { - id: broadcast_source_pid, - connectable: true, - name: PeerName::Unknown, - advertised: vec![AdvertisingDatum::ServiceData( - BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE, - vec![0x01, 0x02, 0x03], - )], - advertising_sid: Some(1), - periodic_advertising_interval: None, - })); - // 3. The same broadcast source, with BASE data, which completes it. - scan_result_controller.add_scanned_result(Ok(ScanResult { - id: broadcast_source_pid, - connectable: true, - name: PeerName::Unknown, - advertised: vec![AdvertisingDatum::ServiceData( - BASIC_AUDIO_ANNOUNCEMENT_SERVICE, - base_data.clone(), - )], - advertising_sid: Some(1), - periodic_advertising_interval: None, - })); - - // The stream should eagerly process all three items and emit the - // FoundBroadcastSource event. - let poll_result = stream.poll_next_unpin(&mut noop_cx); - let Poll::Ready(Some(Ok(event))) = poll_result else { - panic!("should have received event, but got {:?}", poll_result); - }; - assert_matches!(event, Event::FoundBroadcastSource{peer, ..} => { - assert_eq!(peer, broadcast_source_pid); - }); - - // The underlying stream is now empty, so the next poll should be pending. - assert!(stream.poll_next_unpin(&mut noop_cx).is_pending()); - } }
diff --git a/rust/bt-broadcast-assistant/src/assistant/peer.rs b/rust/bt-broadcast-assistant/src/assistant/peer.rs index a901b21..4c8e6e4 100644 --- a/rust/bt-broadcast-assistant/src/assistant/peer.rs +++ b/rust/bt-broadcast-assistant/src/assistant/peer.rs
@@ -15,7 +15,7 @@ #[cfg(any(test, feature = "debug"))] use bt_bass::types::BroadcastReceiveState; use bt_bass::types::{BisSync, PaSync}; -use bt_common::core::PeriodicAdvertisingInterval; +use bt_common::core::{AdvertisingSetId, PeriodicAdvertisingInterval}; use bt_common::packet_encoding::Error as PacketError; use bt_common::PeerId; #[cfg(any(test, feature = "debug"))] @@ -103,13 +103,14 @@ pub async fn add_broadcast_source( &self, source_peer_id: PeerId, + advertising_sid: AdvertisingSetId, address_lookup: &impl GetPeerAddr, pa_sync: PaSync, bis_sync: HashMap<u8, BisSync>, ) -> Result<(), Error> { let mut broadcast_source = self .broadcast_sources - .get_by_peer_id(&source_peer_id) + .get_by_key(source_peer_id, advertising_sid) .ok_or(Error::DoesNotExist(source_peer_id))?; let (broadcast_addr, broadcast_addr_type) = address_lookup @@ -118,7 +119,7 @@ .map_err(|err| Error::AddressLookupError(source_peer_id, err))?; broadcast_source.with_address(broadcast_addr).with_address_type(broadcast_addr_type); - if !broadcast_source.into_add_source() { + if !broadcast_source.is_ready_to_add() { return Err(Error::NotEnoughInfo(source_peer_id)); } @@ -127,7 +128,7 @@ broadcast_source.broadcast_id.unwrap(), broadcast_source.address_type.unwrap(), broadcast_source.address.unwrap(), - broadcast_source.advertising_sid.unwrap(), + advertising_sid, pa_sync, broadcast_source .periodic_advertising_interval @@ -286,6 +287,7 @@ { let fut = peer.add_broadcast_source( PeerId(1001), + AdvertisingSetId(1), &FakeGetPeerAddr, PaSync::SyncPastUnavailable, HashMap::new(), @@ -296,10 +298,8 @@ } let _ = broadcast_source.merge_broadcast_source_data( - &PeerId(1001), - &BroadcastSource::default() - .with_advertising_sid(AdvertisingSetId(1)) - .with_broadcast_id(BroadcastId::try_from(1001).unwrap()), + &(PeerId(1001), AdvertisingSetId(1)), + &BroadcastSource::default().with_broadcast_id(BroadcastId::try_from(1001).unwrap()), ); // Should fail because peer address couldn't be looked up. @@ -308,6 +308,7 @@ StaticPeerAddr::new_for_peer(PeerId(1002), [1, 2, 3, 4, 5, 6], AddressType::Public); let fut = peer.add_broadcast_source( PeerId(1001), + AdvertisingSetId(1), &address_lookup, PaSync::SyncPastUnavailable, HashMap::new(), @@ -323,6 +324,7 @@ StaticPeerAddr::new_for_peer(PeerId(1001), [1, 2, 3, 4, 5, 6], AddressType::Public); let fut = peer.add_broadcast_source( PeerId(1001), + AdvertisingSetId(1), &address_lookup, PaSync::SyncPastUnavailable, HashMap::new(),
diff --git a/rust/bt-broadcast-assistant/src/debug.rs b/rust/bt-broadcast-assistant/src/debug.rs index d8d26d8..abd545f 100644 --- a/rust/bt-broadcast-assistant/src/debug.rs +++ b/rust/bt-broadcast-assistant/src/debug.rs
@@ -10,7 +10,8 @@ #[cfg(any(test, feature = "debug"))] use bt_common::core::ltv::LtValue; #[cfg(any(test, feature = "debug"))] -use bt_common::core::{AddressType, AdvertisingSetId}; +use bt_common::core::AddressType; +use bt_common::core::AdvertisingSetId; use bt_common::debug_command::CommandRunner; use bt_common::debug_command::CommandSet; use bt_common::gen_commandset; @@ -39,15 +40,15 @@ Connect = ("connect", [], ["peer_id"], "Attempt connection to scan delegator"), Disconnect = ("disconnect", [], [], "Disconnect from connected scan delegator"), SendBroadcastCode = ("set-broadcast-code", [], ["broadcast_id", "broadcast_code"], "Attempt to send decryption key for a particular broadcast source to the scan delegator"), - AddBroadcastSource = ("add-broadcast-source", [], ["broadcast_source_pid", "PaSyncOff|PaSyncPast|PaSyncNoPast", "[bis_sync]"], "Attempt to add a particular broadcast source to the scan delegator"), + AddBroadcastSource = ("add-broadcast-source", [], ["source_peer_id", "advertising_sid", "PaSyncOff|PaSyncPast|PaSyncNoPast", "[bis_sync]"], "Attempt to add a particular broadcast source to the scan delegator"), UpdatePaSync = ("update-pa-sync", [], ["broadcast_id", "PaSyncOff|PaSyncPast|PaSyncNoPast", "[bis_sync]"], "Attempt to update the scan delegator's desired pa sync to a particular broadcast source"), RemoveBroadcastSource = ("remove-broadcast-source", [], ["broadcast_id"], "Attempt to remove a particular broadcast source to the scan delegator"), RemoteScanStarted = ("inform-scan-started", [], [], "Inform the scan delegator that we have started scanning on behalf of it"), RemoteScanStopped = ("inform-scan-stopped", [], [], "Inform the scan delegator that we have stopped scanning on behalf of it"), // TODO(http://b/433285146): Once PA scanning is implemented, remove bottom 3 commands. - ForceDiscoverBroadcastSource = ("force-discover-broadcast-source", [], ["broadcast_source_pid", "address", "Public|Random", "advertising_sid"], "Force the broadcast assistant to become aware of the provided broadcast source"), - ForceDiscoverSourceMetadata = ("force-discover-source-metadata", [], ["broadcast_source_pid", "comma_separated_raw_metadata"], "Force the broadcast assistant to become aware of the provided metadata, each BIG's metadata is comma separated"), - ForceDiscoverEmptySourceMetadata = ("force-discover-empty-source-metadata", [], ["broadcast_source_pid", "num_big"], "Force the broadcast assistant to become aware of the provided empty metadata, as many as # BIGs specified"), + ForceDiscoverBroadcastSource = ("force-discover-broadcast-source", [], ["source_peer_id", "address", "Public|Random", "advertising_sid"], "Force the broadcast assistant to become aware of the provided broadcast source"), + ForceDiscoverSourceMetadata = ("force-discover-source-metadata", [], ["source_peer_id", "advertising_sid", "metadata_big1", "[metadata_big_n]..."], "Force the broadcast assistant to become aware of the provided metadata, each BIG's metadata is comma separated"), + ForceDiscoverEmptySourceMetadata = ("force-discover-empty-source-metadata", [], ["source_peer_id", "advertising_sid", "num_big"], "Force the broadcast assistant to become aware of the provided empty metadata, as many as # BIGs specified"), } } @@ -225,7 +226,7 @@ let known = self.assistant.known_broadcast_sources(); println!("Known Broadcast Sources:"); for (id, s) in known { - println!("PeerId ({id}), source: {s:?}"); + println!("({id:?}): {s:?}"); } } AssistantCmd::Connect => { @@ -286,17 +287,23 @@ .await; } AssistantCmd::AddBroadcastSource => { - if args.len() < 2 { + if args.len() < 3 { eprintln!("usage: {}", AssistantCmd::AddBroadcastSource.help_simple()); return Ok(()); } - let Ok(broadcast_source_pid) = parse_peer_id(&args[0]) else { - eprintln!("invalid broadcast id: {}", args[0]); + let Ok(source_peer_id) = parse_peer_id(&args[0]) else { + eprintln!("invalid peer id: {}", args[0]); return Ok(()); }; - let pa_sync: PaSync = match args[1].parse() { + let Ok(sid_val) = parse_int::<u8>(&args[1]) else { + eprintln!("invalid advertising sid: {}", args[1]); + return Ok(()); + }; + let advertising_sid = AdvertisingSetId(sid_val); + + let pa_sync: PaSync = match args[2].parse() { Ok(sync) => sync, Err(e) => { eprintln!("invalid pa_sync: {e:?}"); @@ -305,11 +312,12 @@ }; let bis_sync = - if args.len() == 3 { parse_bis_sync(&args[2]) } else { HashMap::new() }; + if args.len() == 4 { parse_bis_sync(&args[3]) } else { HashMap::new() }; self.with_peer(|peer| async move { peer.add_broadcast_source( - broadcast_source_pid, + source_peer_id, + advertising_sid, &self.peer_addr_getter, pa_sync, bis_sync, @@ -381,7 +389,7 @@ return Ok(()); } - let Ok(peer_id) = parse_peer_id(&args[0]) else { + let Ok(source_peer_id) = parse_peer_id(&args[0]) else { eprintln!("invalid peer id: {}", args[0]); return Ok(()); }; @@ -406,7 +414,7 @@ let advertising_sid = AdvertisingSetId(raw_ad_sid); match self.assistant.force_discover_broadcast_source( - peer_id, + source_peer_id, address, address_type, advertising_sid, @@ -421,7 +429,7 @@ } #[cfg(feature = "debug")] AssistantCmd::ForceDiscoverSourceMetadata => { - if args.len() < 2 { + if args.len() < 3 { eprintln!( "usage: {}", AssistantCmd::ForceDiscoverSourceMetadata.help_simple() @@ -429,13 +437,19 @@ return Ok(()); } - let Ok(peer_id) = parse_peer_id(&args[0]) else { + let Ok(source_peer_id) = parse_peer_id(&args[0]) else { eprintln!("invalid peer id: {}", args[0]); return Ok(()); }; + let Ok(raw_ad_sid) = parse_int::<u8>(&args[1]) else { + eprintln!("invalid advertising sid: {}", args[1]); + return Ok(()); + }; + let advertising_sid = AdvertisingSetId(raw_ad_sid); + let mut all_big_metadata = Vec::new(); - for i in 1..args.len() { + for i in 2..args.len() { let raw_metadata: Vec<u8> = args[i] .split(',') .map(|t| parse_int(t)) @@ -457,17 +471,18 @@ } } - match self - .assistant - .force_discover_broadcast_source_metadata(peer_id, all_big_metadata) - { + match self.assistant.force_discover_broadcast_source_metadata( + source_peer_id, + advertising_sid, + all_big_metadata, + ) { Ok(source) => println!("broadcast source with metadata: {source:?}"), Err(e) => eprintln!("failed to enter in broadcast source metadata: {e:?}"), } } #[cfg(feature = "debug")] AssistantCmd::ForceDiscoverEmptySourceMetadata => { - if args.len() != 2 { + if args.len() != 3 { eprintln!( "usage: {}", AssistantCmd::ForceDiscoverEmptySourceMetadata.help_simple() @@ -475,13 +490,19 @@ return Ok(()); } - let Ok(peer_id) = parse_peer_id(&args[0]) else { + let Ok(source_peer_id) = parse_peer_id(&args[0]) else { eprintln!("invalid peer id: {}", args[0]); return Ok(()); }; - let Ok(num_big) = parse_int::<usize>(&args[1]) else { - eprintln!("invalid # of bigs: {}", args[1]); + let Ok(raw_ad_sid) = parse_int::<u8>(&args[1]) else { + eprintln!("invalid advertising sid: {}", args[1]); + return Ok(()); + }; + let advertising_sid = AdvertisingSetId(raw_ad_sid); + + let Ok(num_big) = parse_int::<usize>(&args[2]) else { + eprintln!("invalid # of bigs: {}", args[2]); return Ok(()); }; @@ -490,10 +511,11 @@ all_big_metadata.push(vec![]); } - match self - .assistant - .force_discover_broadcast_source_metadata(peer_id, all_big_metadata) - { + match self.assistant.force_discover_broadcast_source_metadata( + source_peer_id, + advertising_sid, + all_big_metadata, + ) { Ok(source) => println!("broadcast source with metadata: {source:?}"), Err(e) => { eprintln!("failed to enter in empty broadcast source metadata: {e:?}")
diff --git a/rust/bt-broadcast-assistant/src/types.rs b/rust/bt-broadcast-assistant/src/types.rs index e1b6312..93f9715 100644 --- a/rust/bt-broadcast-assistant/src/types.rs +++ b/rust/bt-broadcast-assistant/src/types.rs
@@ -4,8 +4,8 @@ use bt_bap::types::*; use bt_bass::types::{BigSubgroup, BisSync}; +use bt_common::core::PeriodicAdvertisingInterval; use bt_common::core::{Address, AddressType}; -use bt_common::core::{AdvertisingSetId, PeriodicAdvertisingInterval}; use bt_common::packet_encoding::Error as PacketError; use std::collections::HashMap; @@ -17,7 +17,6 @@ pub struct BroadcastSource { pub(crate) address: Option<Address>, pub(crate) address_type: Option<AddressType>, - pub(crate) advertising_sid: Option<AdvertisingSetId>, pub(crate) broadcast_id: Option<BroadcastId>, pub(crate) periodic_advertising_interval: Option<PeriodicAdvertisingInterval>, pub(crate) endpoint: Option<BroadcastAudioSourceEndpoint>, @@ -26,11 +25,11 @@ impl BroadcastSource { /// Returns whether or not this BroadcastSource has enough information /// to be added by the Broadcast Assistant. - pub(crate) fn into_add_source(&self) -> bool { + pub(crate) fn is_ready_to_add(&self) -> bool { // Address and PA interval information are not necessary since // default value can be used for PA interval and Address is looked up // when the add source operation is triggered. - self.advertising_sid.is_some() && self.broadcast_id.is_some() && self.endpoint.is_some() + self.broadcast_id.is_some() && self.endpoint.is_some() } pub fn with_address(&mut self, address: [u8; 6]) -> &mut Self { @@ -48,11 +47,6 @@ self } - pub fn with_advertising_sid(&mut self, sid: AdvertisingSetId) -> &mut Self { - self.advertising_sid = Some(sid); - self - } - pub fn with_periodic_advertising_interval( &mut self, interval: PeriodicAdvertisingInterval, @@ -77,9 +71,6 @@ if let Some(address_type) = other.address_type { self.address_type = Some(address_type); } - if let Some(advertising_sid) = other.advertising_sid { - self.advertising_sid = Some(advertising_sid); - } if let Some(broadcast_id) = other.broadcast_id { self.broadcast_id = Some(broadcast_id); } @@ -131,13 +122,10 @@ #[test] fn broadcast_source() { let mut b = BroadcastSource::default(); - assert!(!b.into_add_source()); - - b.with_advertising_sid(AdvertisingSetId(0x1)); - assert!(!b.into_add_source()); + assert!(!b.is_ready_to_add()); b.with_broadcast_id(BroadcastId::try_from(0x010203).unwrap()); - assert!(!b.into_add_source()); + assert!(!b.is_ready_to_add()); b.endpoint_to_big_subgroups(HashMap::from([(0, BisSync::sync(vec![1]).unwrap())])) .expect_err("should fail no endpoint data"); @@ -155,7 +143,7 @@ }], }); - assert!(b.into_add_source()); + assert!(b.is_ready_to_add()); let subgroups = b .endpoint_to_big_subgroups(HashMap::from([ (0, BisSync::sync(vec![1]).unwrap()),
diff --git a/rust/bt-common/src/core.rs b/rust/bt-common/src/core.rs index 9549747..d9050d5 100644 --- a/rust/bt-common/src/core.rs +++ b/rust/bt-common/src/core.rs
@@ -50,7 +50,7 @@ } /// Advertising Set ID which is 1 byte long. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct AdvertisingSetId(pub u8); impl AdvertisingSetId {
diff --git a/rust/bt-gatt/src/periodic_advertising.rs b/rust/bt-gatt/src/periodic_advertising.rs index c0c227f..7cfd726 100644 --- a/rust/bt-gatt/src/periodic_advertising.rs +++ b/rust/bt-gatt/src/periodic_advertising.rs
@@ -11,6 +11,8 @@ use bt_common::core::Phy; use futures::{Future, Stream}; + +use crate::central::AdvertisingDatum; use thiserror::Error; use bt_common::PeerId; @@ -35,6 +37,7 @@ /// On success, returns the SyncStream which can be used to receive /// SyncReports. fn sync_to_advertising_reports( + &self, peer_id: PeerId, advertising_sid: u8, config: SyncConfiguration, @@ -54,7 +57,7 @@ #[derive(Debug, Clone)] pub struct PeriodicAdvertisingReport { pub rssi: i8, - pub data: Vec<u8>, + pub data: Vec<AdvertisingDatum>, /// The event counter of the event that the advertising packet was received /// in. pub event_counter: Option<u16>,
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs index 148e666..e8e7de9 100644 --- a/rust/bt-gatt/src/test_utils.rs +++ b/rust/bt-gatt/src/test_utils.rs
@@ -335,24 +335,46 @@ type IndicateConfirmationStream = UnboundedReceiver<Result<server::ConfirmationEvent>>; } -pub struct FakePeriodicAdvertising; +#[derive(Default)] +struct FakePeriodicAdvertisingInner { + sync_registry: HashMap<PeerId, UnboundedSender<Result<SyncReport>>>, +} + +#[derive(Clone, Default)] +pub struct FakePeriodicAdvertising { + inner: Arc<Mutex<FakePeriodicAdvertisingInner>>, +} + +impl FakePeriodicAdvertising { + pub fn new() -> Self { + Self::default() + } + + pub fn get_sender(&self, peer_id: PeerId) -> Option<UnboundedSender<Result<SyncReport>>> { + self.inner.lock().sync_registry.get(&peer_id).cloned() + } +} impl PeriodicAdvertising for FakePeriodicAdvertising { type SyncFut = Ready<Result<Self::SyncStream>>; - type SyncStream = futures::stream::Empty<Result<SyncReport>>; + type SyncStream = UnboundedReceiver<Result<SyncReport>>; fn sync_to_advertising_reports( - _peer_id: PeerId, + &self, + peer_id: PeerId, _advertising_sid: u8, _config: crate::periodic_advertising::SyncConfiguration, ) -> Self::SyncFut { - unimplemented!() + let (tx, rx) = unbounded(); + self.inner.lock().sync_registry.insert(peer_id, tx); + ready(Ok(rx)) } } #[derive(Default)] pub struct FakeCentralInner { clients: HashMap<PeerId, FakeClient>, + pub(crate) periodic_advertising: FakePeriodicAdvertising, } #[derive(Clone)] @@ -385,7 +407,7 @@ } fn periodic_advertising(&self) -> Result<<FakeTypes as GattTypes>::PeriodicAdvertising> { - unimplemented!() + Ok(self.inner.lock().periodic_advertising.clone()) } }