bt/bt-broadcast-assistant: Support multi-broadcast and PA scanning

- Implement PA scanning in EventStream using bt-gatt's PA traits to
  BASE endpoints
- Refactor registry cache to key by (PeerId, AdvertisingSetId) to
  prevent collisions and natively support multi-broadcast sources.

Bug: b/433285146
Change-Id: I7020bea1f25311347cbafc6ab2174d3e2e9b7e4a
Reviewed-on: https://bluetooth-review.googlesource.com/c/bluetooth/+/2980
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
index fc86482..62531ac 100644
--- a/rust/bt-bass/src/client.rs
+++ b/rust/bt-bass/src/client.rs
@@ -285,9 +285,14 @@
         self.write_to_bascp(op).await
     }
 
-    /// Requests the BASS server to add or update Metadata for the Broadcast
-    /// Source, and/or to request the server to synchronize to, or to stop
-    /// synchronization to, a PA and/or a BIS.
+    /// Requests the Scan Delegator to modify a Broadcast Source's
+    /// synchronization state and/or metadata.
+    ///
+    /// This method writes a **Modify Source** operation to the Broadcast Audio
+    /// Scan Control Point. It reads the current state of the Broadcast
+    /// Source from the local cache (the BRS characteristic value), applies
+    /// the requested modifications, and sends the complete updated subgroup
+    /// list to the server.
     ///
     /// # Arguments
     ///
@@ -308,48 +313,46 @@
         bis_map: HashMap<SubgroupIndex, BisSync>,
         metadata_map: Option<HashMap<SubgroupIndex, Vec<Metadata>>>,
     ) -> Result<(), Error> {
-        let op = {
-            let mut state = self
-                .get_broadcast_source_state(&broadcast_id)
-                .ok_or(Error::UnknownBroadcastSource(broadcast_id))?;
+        let mut state = self
+            .get_broadcast_source_state(&broadcast_id)
+            .ok_or(Error::UnknownBroadcastSource(broadcast_id))?;
 
-            // Update BIS_Sync param for BIGs if applicable.
+        // Update BIS_Sync param for BIGs if applicable.
+        for (big_index, group) in state.subgroups.iter_mut().enumerate() {
+            if let Some(bis_sync) = bis_map.get(&(big_index as u8)) {
+                group.bis_sync = bis_sync.clone();
+            }
+        }
+
+        // Update metadata for BIGs if applicable.
+        if let Some(mut m) = metadata_map {
             for (big_index, group) in state.subgroups.iter_mut().enumerate() {
-                if let Some(bis_sync) = bis_map.get(&(big_index as u8)) {
-                    group.bis_sync = bis_sync.clone();
+                if let Some(metadata) = m.remove(&(big_index as u8)) {
+                    group.metadata = metadata;
                 }
             }
 
-            // Update metadata for BIGs if applicable.
-            if let Some(mut m) = metadata_map {
-                for (big_index, group) in state.subgroups.iter_mut().enumerate() {
-                    if let Some(metadata) = m.remove(&(big_index as u8)) {
-                        group.metadata = metadata;
-                    }
+            // Left over metadata values are new subgroups that are to be added. New
+            // subgroups can only be added if the subgroup index is
+            // contiguous to existing subgroups.
+            let mut new_big_indices: Vec<&u8> = m.keys().collect();
+            new_big_indices.sort();
+            for big_index in new_big_indices {
+                if (*big_index as usize) != state.subgroups.len() {
+                    warn!("cannot add new [{big_index}th] subgroup");
+                    break;
                 }
-
-                // Left over metadata values are new subgroups that are to be added. New
-                // subgroups can only be added if the subgroup index is
-                // contiguous to existing subgroups.
-                let mut new_big_indices: Vec<&u8> = m.keys().collect();
-                new_big_indices.sort();
-                for big_index in new_big_indices {
-                    if (*big_index as usize) != state.subgroups.len() {
-                        warn!("cannot add new [{big_index}th] subgroup");
-                        break;
-                    }
-                    let new_subgroup = BigSubgroup::new(None).with_metadata(m[big_index].clone());
-                    state.subgroups.push(new_subgroup);
-                }
+                let new_subgroup = BigSubgroup::new(None).with_metadata(m[big_index].clone());
+                state.subgroups.push(new_subgroup);
             }
+        }
 
-            ModifySourceOperation::new(
-                state.source_id,
-                pa_sync,
-                pa_interval.unwrap_or(PeriodicAdvertisingInterval::unknown()),
-                state.subgroups,
-            )
-        };
+        let op = ModifySourceOperation::new(
+            state.source_id,
+            pa_sync,
+            pa_interval.unwrap_or(PeriodicAdvertisingInterval::unknown()),
+            state.subgroups,
+        );
         self.write_to_bascp(op).await
     }
 
diff --git a/rust/bt-broadcast-assistant/Cargo.toml b/rust/bt-broadcast-assistant/Cargo.toml
index b4d62fb..0876a54 100644
--- a/rust/bt-broadcast-assistant/Cargo.toml
+++ b/rust/bt-broadcast-assistant/Cargo.toml
@@ -14,6 +14,7 @@
 bt-common.workspace = true
 bt-gatt.workspace = true
 futures.workspace = true
+log.workspace = true
 num.workspace = true
 parking_lot.workspace = true
 thiserror.workspace = true
diff --git a/rust/bt-broadcast-assistant/src/assistant.rs b/rust/bt-broadcast-assistant/src/assistant.rs
index 495ed85..cde253a 100644
--- a/rust/bt-broadcast-assistant/src/assistant.rs
+++ b/rust/bt-broadcast-assistant/src/assistant.rs
@@ -11,7 +11,7 @@
 use bt_bap::types::BroadcastId;
 use bt_bass::client::error::Error as BassClientError;
 use bt_bass::client::BroadcastAudioScanServiceClient;
-use bt_common::{PeerId, Uuid};
+use bt_common::{core::AdvertisingSetId, PeerId, Uuid};
 use bt_gatt::central::*;
 use bt_gatt::client::PeerServiceHandle;
 use bt_gatt::types::Error as GattError;
@@ -28,6 +28,8 @@
 pub const BASIC_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1851);
 pub const BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1852);
 
+pub type BroadcastSourceKey = (PeerId, AdvertisingSetId);
+
 #[derive(Debug, Error)]
 pub enum Error {
     #[error("GATT operation error: {0:?}")]
@@ -55,7 +57,7 @@
 /// Contains information about the currently-known broadcast
 /// sources and the peers they were found on
 #[derive(Debug)]
-pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<PeerId, BroadcastSource>>);
+pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<BroadcastSourceKey, BroadcastSource>>);
 
 impl DiscoveredBroadcastSources {
     /// Creates a shareable instance of `DiscoveredBroadcastSources`.
@@ -68,34 +70,36 @@
     /// indicates whether it has changed from before or not.
     pub(crate) fn merge_broadcast_source_data(
         &self,
-        peer_id: &PeerId,
+        key: &BroadcastSourceKey,
         data: &BroadcastSource,
     ) -> (BroadcastSource, bool) {
         let mut lock = self.0.lock();
-        let source = lock.entry(*peer_id).or_default();
+        let source = lock.entry(*key).or_default();
         let before = source.clone();
 
         source.merge(data);
+
         let after = source.clone();
         let changed = before != after;
 
         (after, changed)
     }
 
-    /// Get a BroadcastSource from a peer id.
-    fn get_by_peer_id(&self, peer_id: &PeerId) -> Option<BroadcastSource> {
+    /// Get a BroadcastSource from a peer id and advertising SID.
+    pub(crate) fn get_by_key(
+        &self,
+        peer_id: PeerId,
+        advertising_sid: AdvertisingSetId,
+    ) -> Option<BroadcastSource> {
         let lock = self.0.lock();
-        lock.get(&peer_id).clone().map(|source| source.clone())
+        lock.get(&(peer_id, advertising_sid)).cloned()
     }
 
     /// Get a BroadcastSource from associated broadcast id.
     fn get_by_broadcast_id(&self, broadcast_id: &BroadcastId) -> Option<BroadcastSource> {
         let lock = self.0.lock();
-        let info = lock.iter().find(|(&_k, &ref v)| v.broadcast_id == Some(*broadcast_id));
-        match info {
-            Some((&_peer_id, &ref broadcast_source)) => Some(broadcast_source.clone()),
-            None => None,
-        }
+        let info = lock.values().find(|v| v.broadcast_id == Some(*broadcast_id));
+        info.cloned()
     }
 }
 
@@ -135,8 +139,23 @@
         }
         let scan_result_stream = self.central.scan(&Self::scan_filters());
         self.broadcast_source_scan_started.store(true, Ordering::Relaxed);
+
+        let periodic_advertising = self
+            .central
+            .periodic_advertising()
+            .inspect_err(|e| {
+                // TODO(b/433285146): This should eventually fail the start operation.
+                // For now, log a warning and fallback to EA-only.
+                log::warn!(
+                    "Periodic Advertising not supported by platform: {:?}. Falling back to EA-only scanning.",
+                    e
+                );
+            })
+            .ok();
+
         Ok(EventStream::<T>::new(
             scan_result_stream,
+            periodic_advertising,
             self.broadcast_sources.clone(),
             self.broadcast_source_scan_started.clone(),
         ))
@@ -194,13 +213,15 @@
         let broadcast_source = BroadcastSource {
             address: Some(address),
             address_type: Some(address_type),
-            advertising_sid: Some(advertising_sid),
             broadcast_id: None,
             periodic_advertising_interval: None,
             endpoint: None,
         };
 
-        Ok(self.broadcast_sources.merge_broadcast_source_data(&peer_id, &broadcast_source).0)
+        Ok(self
+            .broadcast_sources
+            .merge_broadcast_source_data(&(peer_id, advertising_sid), &broadcast_source)
+            .0)
     }
 
     // Manually adds broadcast source information for debugging purposes.
@@ -208,6 +229,7 @@
     pub fn force_discover_broadcast_source_metadata(
         &self,
         peer_id: PeerId,
+        advertising_sid: bt_common::core::AdvertisingSetId,
         big_metadata: Vec<Vec<bt_common::generic_audio::metadata_ltv::Metadata>>,
     ) -> Result<BroadcastSource, Error> {
         use bt_bap::types::{BroadcastAudioSourceEndpoint, BroadcastIsochronousGroup};
@@ -229,24 +251,23 @@
         let broadcast_source = BroadcastSource {
             address: None,
             address_type: None,
-            advertising_sid: None,
             broadcast_id: None,
             periodic_advertising_interval: None,
             endpoint: Some(endpoint),
         };
 
-        Ok(self.broadcast_sources.merge_broadcast_source_data(&peer_id, &broadcast_source).0)
+        Ok(self
+            .broadcast_sources
+            .merge_broadcast_source_data(&(peer_id, advertising_sid), &broadcast_source)
+            .0)
     }
 
     // Gets the broadcast sources currently known by the broadcast
     // assistant.
-    pub fn known_broadcast_sources(&self) -> std::collections::HashMap<PeerId, BroadcastSource> {
-        let lock = self.broadcast_sources.0.lock();
-        let mut m = HashMap::new();
-        for (pid, source) in lock.iter() {
-            m.insert(*pid, source.clone());
-        }
-        m
+    pub fn known_broadcast_sources(
+        &self,
+    ) -> std::collections::HashMap<BroadcastSourceKey, BroadcastSource> {
+        self.broadcast_sources.0.lock().clone()
     }
 }
 
@@ -258,7 +279,7 @@
     use std::task::Poll;
 
     use bt_bap::types::*;
-    use bt_common::core::{AddressType, AdvertisingSetId};
+    use bt_common::core::{AddressType, AdvertisingSetId, PeriodicAdvertisingInterval};
     use bt_common::generic_audio::metadata_ltv::Metadata;
     use bt_gatt::test_utils::{FakeCentral, FakeClient, FakeTypes};
 
@@ -267,14 +288,16 @@
     #[test]
     fn merge_broadcast_source() {
         let discovered = DiscoveredBroadcastSources::new();
-        let bid = BroadcastId::try_from(1001).unwrap();
+        let bid1 = BroadcastId::try_from(1001).unwrap();
+        let key1 = (PeerId(1001), AdvertisingSetId(1));
+
+        // 1. Merge initial source data for SID 1.
         let (bs, changed) = discovered.merge_broadcast_source_data(
-            &PeerId(1001),
+            &key1,
             &BroadcastSource::default()
                 .with_address([1, 2, 3, 4, 5, 6])
                 .with_address_type(AddressType::Public)
-                .with_advertising_sid(AdvertisingSetId(1))
-                .with_broadcast_id(bid),
+                .with_broadcast_id(bid1),
         );
         assert!(changed);
         assert_eq!(
@@ -282,18 +305,22 @@
             BroadcastSource {
                 address: Some([1, 2, 3, 4, 5, 6]),
                 address_type: Some(AddressType::Public),
-                advertising_sid: Some(AdvertisingSetId(1)),
-                broadcast_id: Some(bid),
+                broadcast_id: Some(bid1),
                 periodic_advertising_interval: None,
                 endpoint: None,
             }
         );
 
+        // 2. Merge endpoint and PA interval for SID 1.
         let (bs, changed) = discovered.merge_broadcast_source_data(
-            &PeerId(1001),
-            &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
-                BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
-            ),
+            &key1,
+            &BroadcastSource::default()
+                .with_address_type(AddressType::Random)
+                .with_endpoint(BroadcastAudioSourceEndpoint {
+                    presentation_delay_ms: 32,
+                    big: vec![],
+                })
+                .with_periodic_advertising_interval(PeriodicAdvertisingInterval(0x0100)),
         );
         assert!(changed);
         assert_eq!(
@@ -301,9 +328,8 @@
             BroadcastSource {
                 address: Some([1, 2, 3, 4, 5, 6]),
                 address_type: Some(AddressType::Random),
-                advertising_sid: Some(AdvertisingSetId(1)),
-                broadcast_id: Some(bid),
-                periodic_advertising_interval: None,
+                broadcast_id: Some(bid1),
+                periodic_advertising_interval: Some(PeriodicAdvertisingInterval(0x0100)),
                 endpoint: Some(BroadcastAudioSourceEndpoint {
                     presentation_delay_ms: 32,
                     big: vec![]
@@ -311,13 +337,48 @@
             }
         );
 
+        // 3. Merge duplicate endpoint for SID 1 (no change).
         let (_, changed) = discovered.merge_broadcast_source_data(
-            &PeerId(1001),
+            &key1,
             &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
                 BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
             ),
         );
         assert!(!changed);
+
+        // 4. Merge a new broadcast source with a different SID (SID 2) for the same
+        //    peer.
+        let bid2 = BroadcastId::try_from(1002).unwrap();
+        let key2 = (PeerId(1001), AdvertisingSetId(2));
+        let (bs, changed) = discovered.merge_broadcast_source_data(
+            &key2,
+            &BroadcastSource::default()
+                .with_address([1, 2, 3, 4, 5, 6])
+                .with_address_type(AddressType::Public)
+                .with_broadcast_id(bid2),
+        );
+        assert!(changed);
+        assert_eq!(
+            bs,
+            BroadcastSource {
+                address: Some([1, 2, 3, 4, 5, 6]),
+                address_type: Some(AddressType::Public),
+                broadcast_id: Some(bid2),
+                periodic_advertising_interval: None,
+                endpoint: None,
+            }
+        );
+
+        // Verify both entries coexist in the registry
+        assert!(discovered.get_by_key(key1.0, key1.1).is_some());
+        assert!(discovered.get_by_key(key2.0, key2.1).is_some());
+
+        // Verify get_by_broadcast_id works for both and maps to correct keys
+        let lock = discovered.0.lock();
+        let entry1 = lock.iter().find(|(_, v)| v.broadcast_id == Some(bid1)).unwrap();
+        assert_eq!(entry1.0 .1, AdvertisingSetId(1));
+        let entry2 = lock.iter().find(|(_, v)| v.broadcast_id == Some(bid2)).unwrap();
+        assert_eq!(entry2.0 .1, AdvertisingSetId(2));
     }
 
     #[test]
@@ -368,7 +429,10 @@
 
         assert_eq!(source.address, Some(address));
         assert_eq!(source.address_type, Some(address_type));
-        assert_eq!(source.advertising_sid, Some(sid));
+
+        // Verify it is registered in the assistant under the correct key
+        let known = assistant.known_broadcast_sources();
+        assert_eq!(known.get(&(peer_id, sid)).unwrap().address, Some(address));
     }
 
     #[test]
@@ -376,12 +440,18 @@
         let assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new());
         let peer_id = PeerId(1);
         let metadata = vec![vec![Metadata::BroadcastAudioImmediateRenderingFlag]];
+        let sid = AdvertisingSetId(1);
 
-        let source =
-            assistant.force_discover_broadcast_source_metadata(peer_id, metadata.clone()).unwrap();
+        let source = assistant
+            .force_discover_broadcast_source_metadata(peer_id, sid, metadata.clone())
+            .unwrap();
 
         let endpoint = source.endpoint.unwrap();
         assert_eq!(endpoint.big.len(), 1);
         assert_eq!(endpoint.big[0].metadata, metadata[0]);
+
+        // Verify it is registered in the assistant under the correct key
+        let known = assistant.known_broadcast_sources();
+        assert!(known.contains_key(&(peer_id, sid)));
     }
 }
diff --git a/rust/bt-broadcast-assistant/src/assistant/event.rs b/rust/bt-broadcast-assistant/src/assistant/event.rs
index 18a97ae..40c32bc 100644
--- a/rust/bt-broadcast-assistant/src/assistant/event.rs
+++ b/rust/bt-broadcast-assistant/src/assistant/event.rs
@@ -3,7 +3,12 @@
 // found in the LICENSE file.
 
 use core::pin::Pin;
-use futures::stream::{FusedStream, Stream, StreamExt};
+use futures::stream::{
+    abortable, AbortHandle, FusedStream, FuturesUnordered, SelectAll, Stream, StreamExt,
+};
+use futures::FutureExt;
+use std::collections::HashMap;
+use std::future::Future;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::task::Poll;
@@ -14,6 +19,8 @@
 use bt_common::packet_encoding::Error as PacketError;
 use bt_common::PeerId;
 use bt_gatt::central::{AdvertisingDatum, ScanResult};
+use bt_gatt::periodic_advertising::{PeriodicAdvertising, SyncConfiguration, SyncReport};
+use bt_gatt::GattTypes;
 
 use crate::assistant::{
     DiscoveredBroadcastSources, Error, BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
@@ -23,10 +30,28 @@
 
 #[derive(Debug)]
 pub enum Event {
-    FoundBroadcastSource { peer: PeerId, source: BroadcastSource },
-    CouldNotParseAdvertisingData { peer: PeerId, error: PacketError },
+    FoundBroadcastSource {
+        peer: PeerId,
+        advertising_sid: AdvertisingSetId,
+        source: BroadcastSource,
+    },
+    CouldNotParseAdvertisingData {
+        peer: PeerId,
+        error: PacketError,
+    },
 }
 
+type PeriodicAdvertisingSyncResult<T> = Result<
+    <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream,
+    bt_gatt::types::Error,
+>;
+
+type PeriodicAdvertisingFuture<T> =
+    Pin<Box<dyn Future<Output = (PeerId, u8, PeriodicAdvertisingSyncResult<T>)>>>;
+
+type PeriodicAdvertisingStream =
+    Pin<Box<dyn Stream<Item = (PeerId, u8, Result<SyncReport, bt_gatt::types::Error>)>>>;
+
 /// A stream of discovered broadcast sources.
 /// This stream polls the scan results from GATT client to discover
 /// available broadcast sources.
@@ -36,11 +61,24 @@
 
     broadcast_sources: Arc<DiscoveredBroadcastSources>,
     broadcast_source_scan_started: Arc<AtomicBool>,
+
+    periodic_advertising: Option<T::PeriodicAdvertising>,
+
+    establishing_periodic_advertising_syncs: FuturesUnordered<PeriodicAdvertisingFuture<T>>,
+    active_periodic_advertising_sync_streams: SelectAll<PeriodicAdvertisingStream>,
+    active_syncs: HashMap<(PeerId, u8), Option<AbortHandle>>,
 }
 
-impl<T: bt_gatt::GattTypes> EventStream<T> {
+impl<T: bt_gatt::GattTypes> Unpin for EventStream<T> {}
+
+impl<T: bt_gatt::GattTypes> EventStream<T>
+where
+    <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream: 'static,
+    <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncFut: 'static,
+{
     pub(crate) fn new(
         scan_result_stream: T::ScanResultStream,
+        periodic_advertising: Option<T::PeriodicAdvertising>,
         broadcast_sources: Arc<DiscoveredBroadcastSources>,
         broadcast_source_scan_started: Arc<AtomicBool>,
     ) -> Self {
@@ -49,6 +87,62 @@
             terminated: false,
             broadcast_sources,
             broadcast_source_scan_started,
+            periodic_advertising,
+            establishing_periodic_advertising_syncs: FuturesUnordered::new(),
+            active_periodic_advertising_sync_streams: SelectAll::new(),
+            active_syncs: HashMap::new(),
+        }
+    }
+
+    /// Polls the futures currently establishing periodic advertising syncs.
+    ///
+    /// Returns `Poll::Ready(())` if any future resolved (successfully
+    /// establishing a sync or failing), which indicates progress was made.
+    /// Returns `Poll::Pending` if no progress was made.
+    fn poll_establishing_syncs(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
+        if self.establishing_periodic_advertising_syncs.is_terminated() {
+            return Poll::Pending;
+        }
+
+        match self.establishing_periodic_advertising_syncs.poll_next_unpin(cx) {
+            Poll::Ready(Some((peer_id, sid, Ok(stream)))) => {
+                self.handle_established_sync(peer_id, sid, stream);
+                Poll::Ready(())
+            }
+            Poll::Ready(Some((peer_id, sid, Err(_)))) => {
+                self.active_syncs.remove(&(peer_id, sid));
+                Poll::Ready(())
+            }
+            Poll::Ready(None) | Poll::Pending => Poll::Pending,
+        }
+    }
+
+    /// Polls the active periodic advertising sync report streams.
+    ///
+    /// Returns:
+    /// - `Poll::Ready(Some(event))` if progress was made and a completed
+    ///   `FoundBroadcastSource` event is ready to be returned.
+    /// - `Poll::Ready(None)` if progress was made (e.g., a report was processed
+    ///   but was incomplete, or a stream failed and was removed), but no event
+    ///   is ready yet.
+    /// - `Poll::Pending` if no progress was made.
+    fn poll_active_syncs(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Option<Event>> {
+        if self.active_periodic_advertising_sync_streams.is_terminated() {
+            return Poll::Pending;
+        }
+
+        match self.active_periodic_advertising_sync_streams.poll_next_unpin(cx) {
+            Poll::Ready(Some((peer_id, sid, Ok(report)))) => {
+                match self.handle_periodic_advertising_report(peer_id, sid, report) {
+                    Some(event) => Poll::Ready(Some(event)),
+                    None => Poll::Ready(None), // Progressed, but no event
+                }
+            }
+            Poll::Ready(Some((peer_id, sid, Err(_)))) => {
+                self.active_syncs.remove(&(peer_id, sid));
+                Poll::Ready(None) // Progressed, but no event
+            }
+            Poll::Ready(None) | Poll::Pending => Poll::Pending,
         }
     }
 
@@ -66,19 +160,134 @@
             if *uuid == BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE {
                 let bid = BroadcastId::decode(data.as_slice()).0?;
                 source.get_or_insert(BroadcastSource::default()).with_broadcast_id(bid);
-            } else if *uuid == BASIC_AUDIO_ANNOUNCEMENT_SERVICE {
-                // TODO(dayeonglee): revisit when we implement periodic advertisement.
-                let base = BroadcastAudioSourceEndpoint::decode(data.as_slice()).0?;
-                source.get_or_insert(BroadcastSource::default()).with_endpoint(base);
             }
         }
         if let Some(src) = &mut source {
-            src.advertising_sid = scan_result.advertising_sid.map(AdvertisingSetId);
             src.periodic_advertising_interval =
                 scan_result.periodic_advertising_interval.map(PeriodicAdvertisingInterval);
         }
         Ok(source)
     }
+
+    fn handle_established_sync(
+        &mut self,
+        peer_id: PeerId,
+        sid: u8,
+        stream: <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream,
+    ) {
+        let mapped_stream = stream.map(move |report| (peer_id, sid, report));
+        let (abortable_stream, abort_handle) = abortable(mapped_stream);
+
+        self.active_periodic_advertising_sync_streams.push(Box::pin(abortable_stream));
+        self.active_syncs.insert((peer_id, sid), Some(abort_handle));
+    }
+
+    fn handle_periodic_advertising_report(
+        &mut self,
+        peer_id: PeerId,
+        sid: u8,
+        report: SyncReport,
+    ) -> Option<Event> {
+        let SyncReport::PeriodicAdvertisingReport(report) = report else {
+            return None;
+        };
+
+        let Some(base) = parse_base_from_advertising_data(&report.data) else {
+            return None;
+        };
+
+        let (broadcast_source, changed) = self.broadcast_sources.merge_broadcast_source_data(
+            &(peer_id, AdvertisingSetId(sid)),
+            &BroadcastSource::default().with_endpoint(base),
+        );
+
+        if broadcast_source.is_ready_to_add() && changed {
+            if let Some(Some(handle)) = self.active_syncs.remove(&(peer_id, sid)) {
+                handle.abort();
+            }
+            return Some(Event::FoundBroadcastSource {
+                peer: peer_id,
+                advertising_sid: AdvertisingSetId(sid),
+                source: broadcast_source,
+            });
+        }
+        None
+    }
+
+    fn handle_scan_result(&mut self, scanned: ScanResult) -> Option<Event> {
+        let found_source = match Self::try_into_broadcast_source(&scanned) {
+            Ok(Some(src)) => src,
+            Ok(None) => return None,
+            Err(e) => {
+                return Some(Event::CouldNotParseAdvertisingData { peer: scanned.id, error: e })
+            }
+        };
+
+        let Some(raw_sid) = scanned.advertising_sid else {
+            return None;
+        };
+        let sid = AdvertisingSetId(raw_sid);
+
+        let (broadcast_source, changed) =
+            self.broadcast_sources.merge_broadcast_source_data(&(scanned.id, sid), &found_source);
+
+        if broadcast_source.is_ready_to_add() && changed {
+            return Some(Event::FoundBroadcastSource {
+                peer: scanned.id,
+                advertising_sid: sid,
+                source: broadcast_source,
+            });
+        }
+
+        // See if it's appropriate to establish PA sync for this broadcast source.
+
+        // If we already have the endpoint data (BASE), we don't need to establish a
+        // sync.
+        if broadcast_source.endpoint.is_some() {
+            return None;
+        }
+
+        // If the platform doesn't support periodic advertising, we cannot sync.
+        let Some(ref pa) = self.periodic_advertising else {
+            return None;
+        };
+
+        // If we are already actively syncing (or establishing a sync) for this
+        // peer/SID, don't start another one.
+        let key = (scanned.id, sid.0);
+        if self.active_syncs.contains_key(&key) {
+            return None;
+        }
+
+        self.active_syncs.insert(key, None);
+        let fut = pa.sync_to_advertising_reports(
+            scanned.id,
+            sid.0,
+            SyncConfiguration { filter_duplicates: true },
+        );
+        let mapped_fut = fut.map(move |res| (scanned.id, sid.0, res));
+        self.establishing_periodic_advertising_syncs.push(Box::pin(mapped_fut));
+
+        None
+    }
+}
+
+fn parse_base_from_advertising_data(
+    data: &[AdvertisingDatum],
+) -> Option<BroadcastAudioSourceEndpoint> {
+    for datum in data {
+        let AdvertisingDatum::ServiceData(uuid, service_data) = datum else {
+            continue;
+        };
+        if *uuid != BASIC_AUDIO_ANNOUNCEMENT_SERVICE {
+            continue;
+        }
+        let (Ok(base), _) = BroadcastAudioSourceEndpoint::decode(service_data) else {
+            continue;
+        };
+        return Some(base);
+    }
+    None
 }
 
 impl<T: bt_gatt::GattTypes> Drop for EventStream<T> {
@@ -87,13 +296,21 @@
     }
 }
 
-impl<T: bt_gatt::GattTypes> FusedStream for EventStream<T> {
+impl<T: bt_gatt::GattTypes> FusedStream for EventStream<T>
+where
+    <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream: 'static,
+    <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncFut: 'static,
+{
     fn is_terminated(&self) -> bool {
         self.terminated
     }
 }
 
-impl<T: bt_gatt::GattTypes> Stream for EventStream<T> {
+impl<T: bt_gatt::GattTypes> Stream for EventStream<T>
+where
+    <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncStream: 'static,
+    <<T as GattTypes>::PeriodicAdvertising as PeriodicAdvertising>::SyncFut: 'static,
+{
     type Item = Result<Event, Error>;
 
     fn poll_next(
@@ -104,41 +321,40 @@
             return Poll::Ready(None);
         }
 
-        // Poll scan result stream to check if there were any newly discovered peers
-        // that we're interested in.
         loop {
-            let Some(Ok(scanned)) = futures::ready!(self.scan_result_stream.poll_next_unpin(cx))
-            else {
-                self.terminated = true;
-                self.broadcast_source_scan_started.store(false, Ordering::Relaxed);
-                return Poll::Ready(Some(Err(Error::CentralScanTerminated)));
-            };
+            let mut progressed = false;
 
-            let found_source = match Self::try_into_broadcast_source(&scanned) {
-                Err(error) => {
-                    return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData {
-                        peer: scanned.id,
-                        error,
-                    })));
+            if self.poll_establishing_syncs(cx).is_ready() {
+                progressed = true;
+            }
+
+            match self.poll_active_syncs(cx) {
+                Poll::Ready(Some(event)) => return Poll::Ready(Some(Ok(event))),
+                Poll::Ready(None) => progressed = true,
+                Poll::Pending => {}
+            }
+
+            match self.scan_result_stream.poll_next_unpin(cx) {
+                Poll::Ready(Some(Ok(scanned))) => {
+                    progressed = true;
+                    if let Some(event) = self.handle_scan_result(scanned) {
+                        return Poll::Ready(Some(Ok(event)));
+                    }
                 }
-                Ok(None) => continue,
-                Ok(Some(source)) => source,
-            };
+                Poll::Ready(None | Some(Err(_))) => {
+                    self.terminated = true;
+                    self.broadcast_source_scan_started.store(false, Ordering::Relaxed);
+                    return Poll::Ready(Some(Err(Error::CentralScanTerminated)));
+                }
+                Poll::Pending => {}
+            }
 
-            // If we found a broadcast source, we add its information in the
-            // internal records.
-            let (broadcast_source, changed) =
-                self.broadcast_sources.merge_broadcast_source_data(&scanned.id, &found_source);
-
-            // Broadcast found event is relayed to the client iff complete
-            // information has been gathered.
-            if broadcast_source.into_add_source() && changed {
-                return Poll::Ready(Some(Ok(Event::FoundBroadcastSource {
-                    peer: scanned.id,
-                    source: broadcast_source,
-                })));
+            if !progressed {
+                break;
             }
         }
+
+        Poll::Pending
     }
 }
 
@@ -150,29 +366,35 @@
 
     use bt_common::core::{AddressType, AdvertisingSetId};
     use bt_gatt::central::{AdvertisingDatum, PeerName};
-    use bt_gatt::test_utils::{FakeTypes, ScannedResultStream, ScannedResultStreamController};
+    use bt_gatt::test_utils::{
+        FakePeriodicAdvertising, FakeTypes, ScannedResultStream, ScannedResultStreamController,
+    };
     use bt_gatt::types::Error as BtGattError;
     use bt_gatt::types::GattError;
 
-    fn setup_stream() -> (EventStream<FakeTypes>, ScannedResultStreamController) {
+    fn setup_stream(
+    ) -> (EventStream<FakeTypes>, ScannedResultStreamController, FakePeriodicAdvertising) {
         let fake_scan_result_stream = ScannedResultStream::new();
         let controller = fake_scan_result_stream.controller();
         let broadcast_sources = DiscoveredBroadcastSources::new();
         let broadcast_source_scan_started = Arc::new(AtomicBool::new(false));
+        let pa = FakePeriodicAdvertising::default();
 
         (
             EventStream::<FakeTypes>::new(
                 fake_scan_result_stream,
+                Some(pa.clone()),
                 broadcast_sources,
                 broadcast_source_scan_started,
             ),
             controller,
+            pa,
         )
     }
 
     #[test]
     fn poll_found_broadcast_source_events() {
-        let (mut stream, scan_result_controller) = setup_stream();
+        let (mut stream, scan_result_controller, pa) = setup_stream();
 
         // Scanned a broadcast source and its broadcast id.
         let broadcast_source_pid = PeerId(1005);
@@ -185,8 +407,8 @@
                 BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE,
                 vec![0x01, 0x02, 0x03],
             )],
-            advertising_sid: Some(0),
-            periodic_advertising_interval: None,
+            advertising_sid: Some(1),
+            periodic_advertising_interval: Some(0x0100),
         }));
 
         // Found broadcast source event shouldn't have been sent since braodcast source
@@ -194,81 +416,76 @@
         let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
         assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
 
-        // Pretend somehow address, address type, and advertising sid were
-        // filled out. This completes the broadcast source information.
-        // TODO(b/308481381): replace this block with sending a central scan result that
-        // contains the data.
+        // Poll again to let the PA sync transition from Establishing to Established.
+        assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+        // Pretend somehow address, address type were filled out.
         let _ = stream.broadcast_sources.merge_broadcast_source_data(
-            &broadcast_source_pid,
+            &(broadcast_source_pid, AdvertisingSetId(1)),
             &BroadcastSource::default()
                 .with_address([1, 2, 3, 4, 5, 6])
-                .with_address_type(AddressType::Public)
-                .with_advertising_sid(AdvertisingSetId(1)),
+                .with_address_type(AddressType::Public),
         );
 
-        // Scanned broadcast source's BASE data.
-        // TODO(b/308481381): replace this block sending data through PA train instead.
+        // Scanned broadcast source's BASE data:
         #[rustfmt::skip]
         let base_data = vec![
-            0x10, 0x20, 0x30, 0x02, // presentation delay, num of subgroups
-            0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #1)
-            0x00, // codec specific config len
-            0x00, // metadata len,
-            0x01, 0x00, // bis index, codec specific config len (big #1 / bis #1)
-            0x01, 0x02, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #2)
-            0x00, // codec specific config len
-            0x00, // metadata len,
-            0x01, 0x03, 0x02, 0x05,
-            0x08, /* bis index, codec specific config len, codec frame blocks LTV
-                    * (big #2 / bis #2) */
+            0x10, 0x20, 0x30, 0x02,               // presentation delay, num of subgroups
+            0x01, 0x03, 0x00, 0x00, 0x00, 0x00,   // num of bis, codec id (big #1)
+            0x00,                                 // codec specific config len
+            0x00,                                 // metadata len,
+            0x01, 0x00,                           // bis index, codec specific config len (big #1 / bis #1)
+            0x01, 0x02, 0x00, 0x00, 0x00, 0x00,   // num of bis, codec id (big #2)
+            0x00,                                 // codec specific config len
+            0x00,                                 // metadata len,
+            0x01, 0x03, 0x02, 0x05, 0x08,         // bis index, codec specific config len, codec frame blocks LTV (big #2 / bis #2)
         ];
 
-        scan_result_controller.add_scanned_result(Ok(ScanResult {
-            id: broadcast_source_pid,
-            connectable: true,
-            name: PeerName::Unknown,
-            advertised: vec![AdvertisingDatum::ServiceData(
-                BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
-                base_data.clone(),
-            )],
-            advertising_sid: Some(1),
-            periodic_advertising_interval: Some(0x0100),
-        }));
+        let advertising_data =
+            vec![AdvertisingDatum::ServiceData(BASIC_AUDIO_ANNOUNCEMENT_SERVICE, base_data)];
+
+        // Get the fake periodic advertising sync sender that was registered when we
+        // polled the scan result
+        let periodic_advertising_sender = pa
+            .get_sender(broadcast_source_pid)
+            .expect("should have registered periodic advertising sync sender");
+
+        // Send the PA report containing the BASE data
+        periodic_advertising_sender
+            .unbounded_send(Ok(SyncReport::PeriodicAdvertisingReport(
+                bt_gatt::periodic_advertising::PeriodicAdvertisingReport {
+                    rssi: -50,
+                    data: advertising_data,
+                    event_counter: None,
+                    subevent: None,
+                    timestamp: 0,
+                },
+            )))
+            .unwrap();
 
         // Expect the stream to send out broadcast source found event since information
         // is complete.
         let Poll::Ready(Some(Ok(event))) = stream.poll_next_unpin(&mut noop_cx) else {
             panic!("should have received event");
         };
-        assert_matches!(event, Event::FoundBroadcastSource{peer, source} => {
+        assert_matches!(event, Event::FoundBroadcastSource { peer, advertising_sid, source } => {
             assert_eq!(peer, broadcast_source_pid);
-            assert_eq!(source.advertising_sid, Some(AdvertisingSetId(1)));
+            assert_eq!(advertising_sid, AdvertisingSetId(1));
             assert_eq!(source.periodic_advertising_interval, Some(PeriodicAdvertisingInterval(0x0100)));
+            assert_eq!(source.address, Some([1, 2, 3, 4, 5, 6]));
         });
 
-        assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+        // Verify that the PA sync was stopped (removed from active_syncs) to conserve
+        // resources
+        assert!(stream.active_syncs.is_empty());
 
-        // Scanned the same broadcast source's BASE data.
-        scan_result_controller.add_scanned_result(Ok(ScanResult {
-            id: broadcast_source_pid,
-            connectable: true,
-            name: PeerName::Unknown,
-            advertised: vec![AdvertisingDatum::ServiceData(
-                BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
-                base_data.clone(),
-            )],
-            advertising_sid: Some(1),
-            periodic_advertising_interval: Some(0x0100),
-        }));
-
-        // Shouldn't have gotten the event again since the information remained the
-        // same.
+        // Subsequent polls should be pending
         assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
     }
 
     #[test]
     fn central_scan_stream_terminates() {
-        let (mut stream, scan_result_controller) = setup_stream();
+        let (mut stream, scan_result_controller, _pa) = setup_stream();
 
         // Mimick scan error.
         scan_result_controller.add_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu)));
@@ -283,68 +500,4 @@
         assert_matches!(stream.poll_next_unpin(&mut noop_cx), Poll::Ready(None));
         assert_matches!(stream.is_terminated(), true);
     }
-
-    #[test]
-    fn poll_processes_multiple_results_eagerly() {
-        let (mut stream, scan_result_controller) = setup_stream();
-        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
-
-        let broadcast_source_pid = PeerId(1005);
-
-        #[rustfmt::skip]
-        let base_data = vec![
-            0x10, 0x20, 0x30, 0x01, // presentation delay, num of subgroups
-            0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id
-            0x00, // codec specific config len
-            0x00, // metadata len,
-            0x01, 0x00, // bis index, codec specific config len
-        ];
-
-        // 1. An irrelevant peer that should be ignored.
-        scan_result_controller.add_scanned_result(Ok(ScanResult {
-            id: PeerId(1),
-            connectable: true,
-            name: PeerName::Unknown,
-            advertised: vec![],
-            advertising_sid: Some(0),
-            periodic_advertising_interval: None,
-        }));
-        // 2. A broadcast source, but incomplete data (only broadcast ID).
-        scan_result_controller.add_scanned_result(Ok(ScanResult {
-            id: broadcast_source_pid,
-            connectable: true,
-            name: PeerName::Unknown,
-            advertised: vec![AdvertisingDatum::ServiceData(
-                BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE,
-                vec![0x01, 0x02, 0x03],
-            )],
-            advertising_sid: Some(1),
-            periodic_advertising_interval: None,
-        }));
-        // 3. The same broadcast source, with BASE data, which completes it.
-        scan_result_controller.add_scanned_result(Ok(ScanResult {
-            id: broadcast_source_pid,
-            connectable: true,
-            name: PeerName::Unknown,
-            advertised: vec![AdvertisingDatum::ServiceData(
-                BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
-                base_data.clone(),
-            )],
-            advertising_sid: Some(1),
-            periodic_advertising_interval: None,
-        }));
-
-        // The stream should eagerly process all three items and emit the
-        // FoundBroadcastSource event.
-        let poll_result = stream.poll_next_unpin(&mut noop_cx);
-        let Poll::Ready(Some(Ok(event))) = poll_result else {
-            panic!("should have received event, but got {:?}", poll_result);
-        };
-        assert_matches!(event, Event::FoundBroadcastSource{peer, ..} => {
-            assert_eq!(peer, broadcast_source_pid);
-        });
-
-        // The underlying stream is now empty, so the next poll should be pending.
-        assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
-    }
 }
diff --git a/rust/bt-broadcast-assistant/src/assistant/peer.rs b/rust/bt-broadcast-assistant/src/assistant/peer.rs
index a901b21..4c8e6e4 100644
--- a/rust/bt-broadcast-assistant/src/assistant/peer.rs
+++ b/rust/bt-broadcast-assistant/src/assistant/peer.rs
@@ -15,7 +15,7 @@
 #[cfg(any(test, feature = "debug"))]
 use bt_bass::types::BroadcastReceiveState;
 use bt_bass::types::{BisSync, PaSync};
-use bt_common::core::PeriodicAdvertisingInterval;
+use bt_common::core::{AdvertisingSetId, PeriodicAdvertisingInterval};
 use bt_common::packet_encoding::Error as PacketError;
 use bt_common::PeerId;
 #[cfg(any(test, feature = "debug"))]
@@ -103,13 +103,14 @@
     pub async fn add_broadcast_source(
         &self,
         source_peer_id: PeerId,
+        advertising_sid: AdvertisingSetId,
         address_lookup: &impl GetPeerAddr,
         pa_sync: PaSync,
         bis_sync: HashMap<u8, BisSync>,
     ) -> Result<(), Error> {
         let mut broadcast_source = self
             .broadcast_sources
-            .get_by_peer_id(&source_peer_id)
+            .get_by_key(source_peer_id, advertising_sid)
             .ok_or(Error::DoesNotExist(source_peer_id))?;
 
         let (broadcast_addr, broadcast_addr_type) = address_lookup
@@ -118,7 +119,7 @@
             .map_err(|err| Error::AddressLookupError(source_peer_id, err))?;
         broadcast_source.with_address(broadcast_addr).with_address_type(broadcast_addr_type);
 
-        if !broadcast_source.into_add_source() {
+        if !broadcast_source.is_ready_to_add() {
             return Err(Error::NotEnoughInfo(source_peer_id));
         }
 
@@ -127,7 +128,7 @@
                 broadcast_source.broadcast_id.unwrap(),
                 broadcast_source.address_type.unwrap(),
                 broadcast_source.address.unwrap(),
-                broadcast_source.advertising_sid.unwrap(),
+                advertising_sid,
                 pa_sync,
                 broadcast_source
                     .periodic_advertising_interval
@@ -286,6 +287,7 @@
         {
             let fut = peer.add_broadcast_source(
                 PeerId(1001),
+                AdvertisingSetId(1),
                 &FakeGetPeerAddr,
                 PaSync::SyncPastUnavailable,
                 HashMap::new(),
@@ -296,10 +298,8 @@
         }
 
         let _ = broadcast_source.merge_broadcast_source_data(
-            &PeerId(1001),
-            &BroadcastSource::default()
-                .with_advertising_sid(AdvertisingSetId(1))
-                .with_broadcast_id(BroadcastId::try_from(1001).unwrap()),
+            &(PeerId(1001), AdvertisingSetId(1)),
+            &BroadcastSource::default().with_broadcast_id(BroadcastId::try_from(1001).unwrap()),
         );
 
         // Should fail because peer address couldn't be looked up.
@@ -308,6 +308,7 @@
                 StaticPeerAddr::new_for_peer(PeerId(1002), [1, 2, 3, 4, 5, 6], AddressType::Public);
             let fut = peer.add_broadcast_source(
                 PeerId(1001),
+                AdvertisingSetId(1),
                 &address_lookup,
                 PaSync::SyncPastUnavailable,
                 HashMap::new(),
@@ -323,6 +324,7 @@
                 StaticPeerAddr::new_for_peer(PeerId(1001), [1, 2, 3, 4, 5, 6], AddressType::Public);
             let fut = peer.add_broadcast_source(
                 PeerId(1001),
+                AdvertisingSetId(1),
                 &address_lookup,
                 PaSync::SyncPastUnavailable,
                 HashMap::new(),
diff --git a/rust/bt-broadcast-assistant/src/debug.rs b/rust/bt-broadcast-assistant/src/debug.rs
index d8d26d8..abd545f 100644
--- a/rust/bt-broadcast-assistant/src/debug.rs
+++ b/rust/bt-broadcast-assistant/src/debug.rs
@@ -10,7 +10,8 @@
 #[cfg(any(test, feature = "debug"))]
 use bt_common::core::ltv::LtValue;
 #[cfg(any(test, feature = "debug"))]
-use bt_common::core::{AddressType, AdvertisingSetId};
+use bt_common::core::AddressType;
+use bt_common::core::AdvertisingSetId;
 use bt_common::debug_command::CommandRunner;
 use bt_common::debug_command::CommandSet;
 use bt_common::gen_commandset;
@@ -39,15 +40,15 @@
         Connect = ("connect", [], ["peer_id"], "Attempt connection to scan delegator"),
         Disconnect = ("disconnect", [], [], "Disconnect from connected scan delegator"),
         SendBroadcastCode = ("set-broadcast-code", [], ["broadcast_id", "broadcast_code"], "Attempt to send decryption key for a particular broadcast source to the scan delegator"),
-        AddBroadcastSource = ("add-broadcast-source", [], ["broadcast_source_pid", "PaSyncOff|PaSyncPast|PaSyncNoPast", "[bis_sync]"], "Attempt to add a particular broadcast source to the scan delegator"),
+        AddBroadcastSource = ("add-broadcast-source", [], ["source_peer_id", "advertising_sid", "PaSyncOff|PaSyncPast|PaSyncNoPast", "[bis_sync]"], "Attempt to add a particular broadcast source to the scan delegator"),
         UpdatePaSync = ("update-pa-sync", [], ["broadcast_id", "PaSyncOff|PaSyncPast|PaSyncNoPast", "[bis_sync]"], "Attempt to update the scan delegator's desired pa sync to a particular broadcast source"),
         RemoveBroadcastSource = ("remove-broadcast-source", [], ["broadcast_id"], "Attempt to remove a particular broadcast source to the scan delegator"),
         RemoteScanStarted = ("inform-scan-started", [], [], "Inform the scan delegator that we have started scanning on behalf of it"),
         RemoteScanStopped = ("inform-scan-stopped", [], [], "Inform the scan delegator that we have stopped scanning on behalf of it"),
         // TODO(http://b/433285146): Once PA scanning is implemented, remove bottom 3 commands.
-        ForceDiscoverBroadcastSource = ("force-discover-broadcast-source", [], ["broadcast_source_pid", "address", "Public|Random", "advertising_sid"], "Force the broadcast assistant to become aware of the provided broadcast source"),
-        ForceDiscoverSourceMetadata = ("force-discover-source-metadata", [], ["broadcast_source_pid", "comma_separated_raw_metadata"], "Force the broadcast assistant to become aware of the provided metadata, each BIG's metadata is comma separated"),
-        ForceDiscoverEmptySourceMetadata = ("force-discover-empty-source-metadata", [], ["broadcast_source_pid", "num_big"], "Force the broadcast assistant to become aware of the provided empty metadata, as many as # BIGs specified"),
+        ForceDiscoverBroadcastSource = ("force-discover-broadcast-source", [], ["source_peer_id", "address", "Public|Random", "advertising_sid"], "Force the broadcast assistant to become aware of the provided broadcast source"),
+        ForceDiscoverSourceMetadata = ("force-discover-source-metadata", [], ["source_peer_id", "advertising_sid", "metadata_big1", "[metadata_big_n]..."], "Force the broadcast assistant to become aware of the provided metadata, each BIG's metadata is comma separated"),
+        ForceDiscoverEmptySourceMetadata = ("force-discover-empty-source-metadata", [], ["source_peer_id", "advertising_sid", "num_big"], "Force the broadcast assistant to become aware of the provided empty metadata, as many as # BIGs specified"),
     }
 }
 
@@ -225,7 +226,7 @@
                     let known = self.assistant.known_broadcast_sources();
                     println!("Known Broadcast Sources:");
                     for (id, s) in known {
-                        println!("PeerId ({id}), source: {s:?}");
+                        println!("({id:?}): {s:?}");
                     }
                 }
                 AssistantCmd::Connect => {
@@ -286,17 +287,23 @@
                     .await;
                 }
                 AssistantCmd::AddBroadcastSource => {
-                    if args.len() < 2 {
+                    if args.len() < 3 {
                         eprintln!("usage: {}", AssistantCmd::AddBroadcastSource.help_simple());
                         return Ok(());
                     }
 
-                    let Ok(broadcast_source_pid) = parse_peer_id(&args[0]) else {
-                        eprintln!("invalid broadcast id: {}", args[0]);
+                    let Ok(source_peer_id) = parse_peer_id(&args[0]) else {
+                        eprintln!("invalid peer id: {}", args[0]);
                         return Ok(());
                     };
 
-                    let pa_sync: PaSync = match args[1].parse() {
+                    let Ok(sid_val) = parse_int::<u8>(&args[1]) else {
+                        eprintln!("invalid advertising sid: {}", args[1]);
+                        return Ok(());
+                    };
+                    let advertising_sid = AdvertisingSetId(sid_val);
+
+                    let pa_sync: PaSync = match args[2].parse() {
                         Ok(sync) => sync,
                         Err(e) => {
                             eprintln!("invalid pa_sync: {e:?}");
@@ -305,11 +312,12 @@
                     };
 
                     let bis_sync =
-                        if args.len() == 3 { parse_bis_sync(&args[2]) } else { HashMap::new() };
+                        if args.len() == 4 { parse_bis_sync(&args[3]) } else { HashMap::new() };
 
                     self.with_peer(|peer| async move {
                         peer.add_broadcast_source(
-                            broadcast_source_pid,
+                            source_peer_id,
+                            advertising_sid,
                             &self.peer_addr_getter,
                             pa_sync,
                             bis_sync,
@@ -381,7 +389,7 @@
                         return Ok(());
                     }
 
-                    let Ok(peer_id) = parse_peer_id(&args[0]) else {
+                    let Ok(source_peer_id) = parse_peer_id(&args[0]) else {
                         eprintln!("invalid peer id: {}", args[0]);
                         return Ok(());
                     };
@@ -406,7 +414,7 @@
                     let advertising_sid = AdvertisingSetId(raw_ad_sid);
 
                     match self.assistant.force_discover_broadcast_source(
-                        peer_id,
+                        source_peer_id,
                         address,
                         address_type,
                         advertising_sid,
@@ -421,7 +429,7 @@
                 }
                 #[cfg(feature = "debug")]
                 AssistantCmd::ForceDiscoverSourceMetadata => {
-                    if args.len() < 2 {
+                    if args.len() < 3 {
                         eprintln!(
                             "usage: {}",
                             AssistantCmd::ForceDiscoverSourceMetadata.help_simple()
@@ -429,13 +437,19 @@
                         return Ok(());
                     }
 
-                    let Ok(peer_id) = parse_peer_id(&args[0]) else {
+                    let Ok(source_peer_id) = parse_peer_id(&args[0]) else {
                         eprintln!("invalid peer id: {}", args[0]);
                         return Ok(());
                     };
 
+                    let Ok(raw_ad_sid) = parse_int::<u8>(&args[1]) else {
+                        eprintln!("invalid advertising sid: {}", args[1]);
+                        return Ok(());
+                    };
+                    let advertising_sid = AdvertisingSetId(raw_ad_sid);
+
                     let mut all_big_metadata = Vec::new();
-                    for i in 1..args.len() {
+                    for i in 2..args.len() {
                         let raw_metadata: Vec<u8> = args[i]
                             .split(',')
                             .map(|t| parse_int(t))
@@ -457,17 +471,18 @@
                         }
                     }
 
-                    match self
-                        .assistant
-                        .force_discover_broadcast_source_metadata(peer_id, all_big_metadata)
-                    {
+                    match self.assistant.force_discover_broadcast_source_metadata(
+                        source_peer_id,
+                        advertising_sid,
+                        all_big_metadata,
+                    ) {
                         Ok(source) => println!("broadcast source with metadata: {source:?}"),
                         Err(e) => eprintln!("failed to enter in broadcast source metadata: {e:?}"),
                     }
                 }
                 #[cfg(feature = "debug")]
                 AssistantCmd::ForceDiscoverEmptySourceMetadata => {
-                    if args.len() != 2 {
+                    if args.len() != 3 {
                         eprintln!(
                             "usage: {}",
                             AssistantCmd::ForceDiscoverEmptySourceMetadata.help_simple()
@@ -475,13 +490,19 @@
                         return Ok(());
                     }
 
-                    let Ok(peer_id) = parse_peer_id(&args[0]) else {
+                    let Ok(source_peer_id) = parse_peer_id(&args[0]) else {
                         eprintln!("invalid peer id: {}", args[0]);
                         return Ok(());
                     };
 
-                    let Ok(num_big) = parse_int::<usize>(&args[1]) else {
-                        eprintln!("invalid # of bigs: {}", args[1]);
+                    let Ok(raw_ad_sid) = parse_int::<u8>(&args[1]) else {
+                        eprintln!("invalid advertising sid: {}", args[1]);
+                        return Ok(());
+                    };
+                    let advertising_sid = AdvertisingSetId(raw_ad_sid);
+
+                    let Ok(num_big) = parse_int::<usize>(&args[2]) else {
+                        eprintln!("invalid # of bigs: {}", args[2]);
                         return Ok(());
                     };
 
@@ -490,10 +511,11 @@
                         all_big_metadata.push(vec![]);
                     }
 
-                    match self
-                        .assistant
-                        .force_discover_broadcast_source_metadata(peer_id, all_big_metadata)
-                    {
+                    match self.assistant.force_discover_broadcast_source_metadata(
+                        source_peer_id,
+                        advertising_sid,
+                        all_big_metadata,
+                    ) {
                         Ok(source) => println!("broadcast source with metadata: {source:?}"),
                         Err(e) => {
                             eprintln!("failed to enter in empty broadcast source metadata: {e:?}")
diff --git a/rust/bt-broadcast-assistant/src/types.rs b/rust/bt-broadcast-assistant/src/types.rs
index e1b6312..93f9715 100644
--- a/rust/bt-broadcast-assistant/src/types.rs
+++ b/rust/bt-broadcast-assistant/src/types.rs
@@ -4,8 +4,8 @@
 
 use bt_bap::types::*;
 use bt_bass::types::{BigSubgroup, BisSync};
+use bt_common::core::PeriodicAdvertisingInterval;
 use bt_common::core::{Address, AddressType};
-use bt_common::core::{AdvertisingSetId, PeriodicAdvertisingInterval};
 use bt_common::packet_encoding::Error as PacketError;
 use std::collections::HashMap;
 
@@ -17,7 +17,6 @@
 pub struct BroadcastSource {
     pub(crate) address: Option<Address>,
     pub(crate) address_type: Option<AddressType>,
-    pub(crate) advertising_sid: Option<AdvertisingSetId>,
     pub(crate) broadcast_id: Option<BroadcastId>,
     pub(crate) periodic_advertising_interval: Option<PeriodicAdvertisingInterval>,
     pub(crate) endpoint: Option<BroadcastAudioSourceEndpoint>,
@@ -26,11 +25,11 @@
 impl BroadcastSource {
     /// Returns whether or not this BroadcastSource has enough information
     /// to be added by the Broadcast Assistant.
-    pub(crate) fn into_add_source(&self) -> bool {
+    pub(crate) fn is_ready_to_add(&self) -> bool {
         // Address and PA interval information are not necessary since
         // default value can be used for PA interval and Address is looked up
         // when the add source operation is triggered.
-        self.advertising_sid.is_some() && self.broadcast_id.is_some() && self.endpoint.is_some()
+        self.broadcast_id.is_some() && self.endpoint.is_some()
     }
 
     pub fn with_address(&mut self, address: [u8; 6]) -> &mut Self {
@@ -48,11 +47,6 @@
         self
     }
 
-    pub fn with_advertising_sid(&mut self, sid: AdvertisingSetId) -> &mut Self {
-        self.advertising_sid = Some(sid);
-        self
-    }
-
     pub fn with_periodic_advertising_interval(
         &mut self,
         interval: PeriodicAdvertisingInterval,
@@ -77,9 +71,6 @@
         if let Some(address_type) = other.address_type {
             self.address_type = Some(address_type);
         }
-        if let Some(advertising_sid) = other.advertising_sid {
-            self.advertising_sid = Some(advertising_sid);
-        }
         if let Some(broadcast_id) = other.broadcast_id {
             self.broadcast_id = Some(broadcast_id);
         }
@@ -131,13 +122,10 @@
     #[test]
     fn broadcast_source() {
         let mut b = BroadcastSource::default();
-        assert!(!b.into_add_source());
-
-        b.with_advertising_sid(AdvertisingSetId(0x1));
-        assert!(!b.into_add_source());
+        assert!(!b.is_ready_to_add());
 
         b.with_broadcast_id(BroadcastId::try_from(0x010203).unwrap());
-        assert!(!b.into_add_source());
+        assert!(!b.is_ready_to_add());
 
         b.endpoint_to_big_subgroups(HashMap::from([(0, BisSync::sync(vec![1]).unwrap())]))
             .expect_err("should fail no endpoint data");
@@ -155,7 +143,7 @@
             }],
         });
 
-        assert!(b.into_add_source());
+        assert!(b.is_ready_to_add());
         let subgroups = b
             .endpoint_to_big_subgroups(HashMap::from([
                 (0, BisSync::sync(vec![1]).unwrap()),
diff --git a/rust/bt-common/src/core.rs b/rust/bt-common/src/core.rs
index 9549747..d9050d5 100644
--- a/rust/bt-common/src/core.rs
+++ b/rust/bt-common/src/core.rs
@@ -50,7 +50,7 @@
 }
 
 /// Advertising Set ID which is 1 byte long.
-#[derive(Debug, Clone, Copy, PartialEq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub struct AdvertisingSetId(pub u8);
 
 impl AdvertisingSetId {
diff --git a/rust/bt-gatt/src/periodic_advertising.rs b/rust/bt-gatt/src/periodic_advertising.rs
index c0c227f..7cfd726 100644
--- a/rust/bt-gatt/src/periodic_advertising.rs
+++ b/rust/bt-gatt/src/periodic_advertising.rs
@@ -11,6 +11,8 @@
 
 use bt_common::core::Phy;
 use futures::{Future, Stream};
+
+use crate::central::AdvertisingDatum;
 use thiserror::Error;
 
 use bt_common::PeerId;
@@ -35,6 +37,7 @@
     /// On success, returns the SyncStream which can be used to receive
     /// SyncReports.
     fn sync_to_advertising_reports(
+        &self,
         peer_id: PeerId,
         advertising_sid: u8,
         config: SyncConfiguration,
@@ -54,7 +57,7 @@
 #[derive(Debug, Clone)]
 pub struct PeriodicAdvertisingReport {
     pub rssi: i8,
-    pub data: Vec<u8>,
+    pub data: Vec<AdvertisingDatum>,
     /// The event counter of the event that the advertising packet was received
     /// in.
     pub event_counter: Option<u16>,
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index 148e666..e8e7de9 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -335,24 +335,46 @@
     type IndicateConfirmationStream = UnboundedReceiver<Result<server::ConfirmationEvent>>;
 }
 
-pub struct FakePeriodicAdvertising;
+#[derive(Default)]
+struct FakePeriodicAdvertisingInner {
+    sync_registry: HashMap<PeerId, UnboundedSender<Result<SyncReport>>>,
+}
+
+#[derive(Clone, Default)]
+pub struct FakePeriodicAdvertising {
+    inner: Arc<Mutex<FakePeriodicAdvertisingInner>>,
+}
+
+impl FakePeriodicAdvertising {
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    pub fn get_sender(&self, peer_id: PeerId) -> Option<UnboundedSender<Result<SyncReport>>> {
+        self.inner.lock().sync_registry.get(&peer_id).cloned()
+    }
+}
 
 impl PeriodicAdvertising for FakePeriodicAdvertising {
     type SyncFut = Ready<Result<Self::SyncStream>>;
-    type SyncStream = futures::stream::Empty<Result<SyncReport>>;
+    type SyncStream = UnboundedReceiver<Result<SyncReport>>;
 
     fn sync_to_advertising_reports(
-        _peer_id: PeerId,
+        &self,
+        peer_id: PeerId,
         _advertising_sid: u8,
         _config: crate::periodic_advertising::SyncConfiguration,
     ) -> Self::SyncFut {
-        unimplemented!()
+        let (tx, rx) = unbounded();
+        self.inner.lock().sync_registry.insert(peer_id, tx);
+        ready(Ok(rx))
     }
 }
 
 #[derive(Default)]
 pub struct FakeCentralInner {
     clients: HashMap<PeerId, FakeClient>,
+    pub(crate) periodic_advertising: FakePeriodicAdvertising,
 }
 
 #[derive(Clone)]
@@ -385,7 +407,7 @@
     }
 
     fn periodic_advertising(&self) -> Result<<FakeTypes as GattTypes>::PeriodicAdvertising> {
-        unimplemented!()
+        Ok(self.inner.lock().periodic_advertising.clone())
     }
 }