rust/bt-broadcast-assistant: Initial implementation

Functionalities are:
 - continuously scan and collate information about broadcast sources
 - connect to peers and add/update/remove sources

Change-Id: I0628dd8035bfb6d3c569e16e438b9d221959762b
Reviewed-on: https://bluetooth-review.git.corp.google.com/c/bluetooth/+/1580
Reviewed-by: Marie Janssen <jamuraa@google.com>
Reviewed-by: Ani Ramakrishnan <aniramakri@google.com>
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 60e0918..f4da3a7 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -12,6 +12,7 @@
 
 ## Local path dependencies (keep sorted)
 bt-bass = { path = "bt-bass" }
+bt-broadcast-assistant = { path = "bt-broadcast-assistant" }
 bt-common = { path = "bt-common" }
 bt-gatt = { path = "bt-gatt" }
 bt-pacs = { path = "bt-pacs" }
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
index 3aa0b18..3479c66 100644
--- a/rust/bt-bass/src/client.rs
+++ b/rust/bt-bass/src/client.rs
@@ -13,7 +13,7 @@
 use parking_lot::Mutex;
 use tracing::warn;
 
-use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::core::{AddressType, AdvertisingSetId, PaInterval};
 use bt_common::generic_audio::metadata_ltv::Metadata;
 use bt_common::packet_encoding::Decodable;
 use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic};
@@ -123,7 +123,7 @@
             audio_scan_control_point,
             broadcast_sources: Default::default(),
             broadcast_codes: HashMap::new(),
-            notification_streams: None,
+            notification_streams: Some(SelectAll::new()),
         }
     }
 
@@ -407,6 +407,11 @@
         }
         brs
     }
+
+    #[cfg(any(test, feature = "test-utils"))]
+    pub fn insert_broadcast_receive_state(&mut self, handle: Handle, brs: BroadcastReceiveState) {
+        self.broadcast_sources.lock().update_state(handle, brs);
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/bt-bass/src/client/event.rs b/rust/bt-bass/src/client/event.rs
index 2c7ac2f..4077373 100644
--- a/rust/bt-bass/src/client/event.rs
+++ b/rust/bt-bass/src/client/event.rs
@@ -218,6 +218,7 @@
     use assert_matches::assert_matches;
     use futures::channel::mpsc::unbounded;
 
+    use bt_common::core::AddressType;
     use bt_gatt::types::Handle;
 
     #[test]
diff --git a/rust/bt-bass/src/types.rs b/rust/bt-bass/src/types.rs
index f810a3d..31c8b2d 100644
--- a/rust/bt-bass/src/types.rs
+++ b/rust/bt-bass/src/types.rs
@@ -3,7 +3,7 @@
 // found in the LICENSE file.
 
 use bt_common::core::ltv::LtValue;
-use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::core::{AddressType, AdvertisingSetId, PaInterval};
 use bt_common::generic_audio::metadata_ltv::*;
 use bt_common::packet_encoding::{Decodable, Encodable, Error as PacketError};
 use bt_common::{decodable_enum, Uuid};
@@ -676,32 +676,6 @@
     }
 }
 
-/// See Broadcast Audio Scan Service spec v1.0 Table 3.5 for details.
-#[repr(u8)]
-#[derive(Clone, Copy, Debug, PartialEq)]
-pub enum AddressType {
-    // Public Device Address or Public Identity Address.
-    Public = 0x00,
-    // Random Device Address or Random (static) Identity Address.
-    Random = 0x01,
-}
-
-impl AddressType {
-    const BYTE_SIZE: usize = 1;
-}
-
-impl TryFrom<u8> for AddressType {
-    type Error = PacketError;
-
-    fn try_from(value: u8) -> Result<Self, Self::Error> {
-        match value {
-            0x00 => Ok(Self::Public),
-            0x01 => Ok(Self::Random),
-            _ => Err(PacketError::OutOfRange),
-        }
-    }
-}
-
 /// Broadcast Receive State characteristic as defined in
 /// Broadcast Audio Scan Service spec v1.0 Section 3.2.
 /// The Broadcast Receive State characteristic is used by the server to expose
@@ -791,7 +765,7 @@
         + EncryptionStatus::MIN_PACKET_SIZE
         + NUM_SUBGROUPS_BYTE_SIZE;
 
-    #[cfg(test)]
+    #[cfg(any(test, feature = "test-utils"))]
     pub fn new(
         source_id: u8,
         source_address_type: AddressType,
diff --git a/rust/bt-broadcast-assistant/.gitignore b/rust/bt-broadcast-assistant/.gitignore
new file mode 100644
index 0000000..75e03aa
--- /dev/null
+++ b/rust/bt-broadcast-assistant/.gitignore
@@ -0,0 +1,17 @@
+# Generated by Cargo
+# will have compiled files and executables
+debug/
+target/
+
+# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
+# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
+Cargo.lock
+
+# These are backup files generated by rustfmt
+**/*.rs.bk
+
+# MSVC Windows builds of rustc generate these, which store debugging information
+*.pdb
+
+# Vim swap files
+*.swp
diff --git a/rust/bt-broadcast-assistant/Cargo.toml b/rust/bt-broadcast-assistant/Cargo.toml
new file mode 100644
index 0000000..a3ccfa6
--- /dev/null
+++ b/rust/bt-broadcast-assistant/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "bt-broadcast-assistant"
+version = "0.0.1"
+edition.workspace = true
+license.workspace = true
+
+[dependencies]
+bt-bass.workspace = true
+bt-common.workspace = true
+bt-gatt.workspace = true
+futures.workspace = true
+parking_lot.workspace = true
+thiserror.workspace = true
+
+[dev-dependencies]
+assert_matches.workspace = true
+bt-bass = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-broadcast-assistant/src/assistant.rs b/rust/bt-broadcast-assistant/src/assistant.rs
new file mode 100644
index 0000000..3d3e61e
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/assistant.rs
@@ -0,0 +1,272 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use parking_lot::Mutex;
+use std::collections::HashMap;
+use std::sync::Arc;
+use thiserror::Error;
+
+use bt_bass::client::error::Error as BassClientError;
+use bt_bass::client::BroadcastAudioScanServiceClient;
+use bt_bass::types::BroadcastId;
+use bt_common::{PeerId, Uuid};
+use bt_gatt::central::*;
+use bt_gatt::client::PeerServiceHandle;
+use bt_gatt::types::Error as GattError;
+use bt_gatt::Client;
+
+pub mod event;
+use event::*;
+pub mod peer;
+pub use peer::Peer;
+
+use crate::types::*;
+
+pub const BROADCAST_AUDIO_SCAN_SERVICE: Uuid = Uuid::from_u16(0x184F);
+pub const BASIC_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1851);
+pub const BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1852);
+
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error("GATT operation error: {0:?}")]
+    Gatt(#[from] GattError),
+
+    #[error("Broadcast Audio Scan Service client error at peer ({0}): {1:?}")]
+    BassClient(PeerId, BassClientError),
+
+    #[error("Not connected to Broadcast Audio Scan Service at peer ({0})")]
+    NotConnectedToBass(PeerId),
+
+    #[error("Central scanning terminated unexpectedly")]
+    CentralScanTerminated,
+
+    #[error("Failed to connect to service ({1}) at peer ({0})")]
+    ConnectionFailure(PeerId, Uuid),
+
+    #[error("Broadcast Assistant was already started previously. It cannot be started twice")]
+    AlreadyStarted,
+
+    #[error("Failed due to error: {0}")]
+    Generic(String),
+}
+
+/// Contains information about the currently-known broadcast
+/// sources and the peers they were found on
+#[derive(Debug)]
+pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<PeerId, BroadcastSource>>);
+
+impl DiscoveredBroadcastSources {
+    /// Creates a shareable instance of `DiscoveredBroadcastSources`.
+    pub fn new() -> Arc<Self> {
+        Arc::new(Self(Mutex::new(HashMap::new())))
+    }
+
+    /// Merges the broadcast source data with existing broadcast source data.
+    /// Returns the copy of the broadcast source data after the merge and
+    /// indicates whether it has changed from before or not.
+    pub(crate) fn merge_broadcast_source_data(
+        &self,
+        peer_id: &PeerId,
+        data: &BroadcastSource,
+    ) -> (BroadcastSource, bool) {
+        let mut lock = self.0.lock();
+        let source = lock.entry(*peer_id).or_default();
+        let before = source.clone();
+
+        source.merge(data);
+        let after = source.clone();
+        let changed = before != after;
+
+        (after, changed)
+    }
+
+    /// Get a BroadcastSource from a peer id.
+    fn get_by_peer_id(&self, peer_id: &PeerId) -> Option<BroadcastSource> {
+        let lock = self.0.lock();
+        lock.get(&peer_id).clone().map(|source| source.clone())
+    }
+
+    /// Get a BroadcastSource from associated broadcast id.
+    fn get_by_broadcast_id(&self, broadcast_id: &BroadcastId) -> Option<BroadcastSource> {
+        let lock = self.0.lock();
+        let info = lock.iter().find(|(&_k, &ref v)| v.broadcast_id == Some(*broadcast_id));
+        match info {
+            Some((&_peer_id, &ref broadcast_source)) => Some(broadcast_source.clone()),
+            None => None,
+        }
+    }
+}
+
+pub struct BroadcastAssistant<T: bt_gatt::GattTypes> {
+    central: T::Central,
+    broadcast_sources: Arc<DiscoveredBroadcastSources>,
+    scan_stream: Option<T::ScanResultStream>,
+}
+
+impl<T: bt_gatt::GattTypes + 'static> BroadcastAssistant<T> {
+    // Creates a broadcast assistant and sets it up to be ready
+    // for broadcast source scanning. Clients must use the `start`
+    // method to poll the event stream for scan results.
+    pub fn new(central: T::Central) -> Self
+    where
+        <T as bt_gatt::GattTypes>::ScanResultStream: std::marker::Send,
+    {
+        let scan_result_stream = central.scan(&Self::scan_filters());
+        Self {
+            central,
+            broadcast_sources: DiscoveredBroadcastSources::new(),
+            scan_stream: Some(scan_result_stream),
+        }
+    }
+
+    /// List of scan filters for advertisement data Broadcast Assistant should
+    /// look for, which are:
+    /// - Service data with Broadcast Audio Announcement Service UUID from
+    ///   Broadcast Sources (see BAP spec v1.0.1 Section 3.7.2.1 for details)
+    // TODO(b/308481381): define filter for finding broadcast sink.
+    fn scan_filters() -> Vec<ScanFilter> {
+        vec![Filter::HasServiceData(BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE).into()]
+    }
+
+    /// Start broadcast assistant. Returns EventStream that the upper layer can
+    /// poll. Upper layer can call methods on BroadcastAssistant based on the
+    /// events it sees.
+    pub fn start(&mut self) -> Result<EventStream, Error>
+    where
+        <T as bt_gatt::GattTypes>::ScanResultStream: std::marker::Send,
+    {
+        if self.scan_stream.is_none() {
+            return Err(Error::AlreadyStarted);
+        }
+        Ok(EventStream::new(self.scan_stream.take().unwrap(), self.broadcast_sources.clone()))
+    }
+
+    pub fn scan_for_scan_delegators(&mut self) -> T::ScanResultStream {
+        // Scan for service data with Broadcast Audio Scan Service UUID to look
+        // for Broadcast Sink collocated with the Scan Delegator (see BAP spec v1.0.1
+        // Section 3.9.2 for details).
+        self.central.scan(&vec![Filter::HasServiceData(BROADCAST_AUDIO_SCAN_SERVICE).into()])
+    }
+
+    pub async fn connect_to_scan_delegator(&mut self, peer_id: PeerId) -> Result<Peer<T>, Error>
+    where
+        <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
+    {
+        let client = self.central.connect(peer_id).await?;
+        let service_handles = client.find_service(BROADCAST_AUDIO_SCAN_SERVICE).await?;
+
+        for handle in service_handles {
+            if handle.uuid() != BROADCAST_AUDIO_SCAN_SERVICE || !handle.is_primary() {
+                continue;
+            }
+            let service = handle.connect().await?;
+            let bass = BroadcastAudioScanServiceClient::<T>::create(service)
+                .await
+                .map_err(|e| Error::BassClient(peer_id, e))?;
+
+            let connected_peer =
+                Peer::<T>::new(peer_id, client, bass, self.broadcast_sources.clone());
+            return Ok(connected_peer);
+        }
+        Err(Error::ConnectionFailure(peer_id, BROADCAST_AUDIO_SCAN_SERVICE))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use futures::{pin_mut, FutureExt};
+    use std::task::Poll;
+
+    use bt_common::core::{AddressType, AdvertisingSetId};
+    use bt_gatt::test_utils::{FakeCentral, FakeClient, FakeTypes};
+
+    use crate::assistant::peer::tests::fake_bass_service;
+
+    #[test]
+    fn merge_broadcast_source() {
+        let discovered = DiscoveredBroadcastSources::new();
+
+        let (bs, changed) = discovered.merge_broadcast_source_data(
+            &PeerId(1001),
+            &BroadcastSource::default()
+                .with_address([1, 2, 3, 4, 5, 6])
+                .with_address_type(AddressType::Public)
+                .with_advertising_sid(AdvertisingSetId(1))
+                .with_broadcast_id(BroadcastId(1001)),
+        );
+        assert!(changed);
+        assert_eq!(
+            bs,
+            BroadcastSource {
+                address: Some([1, 2, 3, 4, 5, 6]),
+                address_type: Some(AddressType::Public),
+                advertising_sid: Some(AdvertisingSetId(1)),
+                broadcast_id: Some(BroadcastId(1001)),
+                pa_interval: None,
+                endpoint: None,
+            }
+        );
+
+        let (bs, changed) = discovered.merge_broadcast_source_data(
+            &PeerId(1001),
+            &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
+                BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
+            ),
+        );
+        assert!(changed);
+        assert_eq!(
+            bs,
+            BroadcastSource {
+                address: Some([1, 2, 3, 4, 5, 6]),
+                address_type: Some(AddressType::Random),
+                advertising_sid: Some(AdvertisingSetId(1)),
+                broadcast_id: Some(BroadcastId(1001)),
+                pa_interval: None,
+                endpoint: Some(BroadcastAudioSourceEndpoint {
+                    presentation_delay_ms: 32,
+                    big: vec![]
+                }),
+            }
+        );
+
+        let (_, changed) = discovered.merge_broadcast_source_data(
+            &PeerId(1001),
+            &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
+                BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
+            ),
+        );
+        assert!(!changed);
+    }
+
+    #[test]
+    fn start_stream() {
+        let mut assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new());
+        let _ = assistant.start().expect("can start stream");
+
+        // Stream can only be started once.
+        assert!(assistant.start().is_err());
+    }
+
+    #[test]
+    fn connect_to_scan_delegator() {
+        // Set up fake GATT related objects.
+        let mut central = FakeCentral::new();
+        let mut client = FakeClient::new();
+        central.add_client(PeerId(1004), client.clone());
+        let service = fake_bass_service();
+        client.add_service(BROADCAST_AUDIO_SCAN_SERVICE, true, service.clone());
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let mut assistant = BroadcastAssistant::<FakeTypes>::new(central);
+        let conn_fut = assistant.connect_to_scan_delegator(PeerId(1004));
+        pin_mut!(conn_fut);
+        let polled = conn_fut.poll_unpin(&mut noop_cx);
+        let Poll::Ready(res) = polled else {
+            panic!("should be ready");
+        };
+        let _ = res.expect("should be ok");
+    }
+}
diff --git a/rust/bt-broadcast-assistant/src/assistant/event.rs b/rust/bt-broadcast-assistant/src/assistant/event.rs
new file mode 100644
index 0000000..993ba6f
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/assistant/event.rs
@@ -0,0 +1,259 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use futures::stream::{FusedStream, Stream, StreamExt};
+use std::sync::Arc;
+use std::task::Poll;
+
+use bt_bass::types::BroadcastId;
+use bt_common::packet_encoding::Decodable;
+use bt_common::packet_encoding::Error as PacketError;
+use bt_common::PeerId;
+use bt_gatt::central::{AdvertisingDatum, ScanResult};
+use core::pin::Pin;
+
+use crate::assistant::{
+    DiscoveredBroadcastSources, Error, BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
+    BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE,
+};
+use crate::types::{BroadcastAudioSourceEndpoint, BroadcastSource};
+
+#[derive(Debug)]
+pub enum Event {
+    FoundBroadcastSource { peer: PeerId, source: BroadcastSource },
+    CouldNotParseAdvertisingData { peer: PeerId, error: PacketError },
+}
+
+/// A stream of discovered broadcast sources.
+/// This stream polls the scan results from GATT client to discover
+/// available broadcast sources.
+pub struct EventStream {
+    // Central scan result stream for finding broadcast sources.
+    scan_result_stream: Pin<Box<dyn Stream<Item = Result<ScanResult, bt_gatt::types::Error>>>>,
+    terminated: bool,
+
+    broadcast_sources: Arc<DiscoveredBroadcastSources>,
+}
+
+impl EventStream {
+    pub(crate) fn new(
+        scan_result_stream: impl Stream<Item = Result<ScanResult, bt_gatt::types::Error>> + 'static,
+        broadcast_sources: Arc<DiscoveredBroadcastSources>,
+    ) -> Self {
+        Self {
+            scan_result_stream: Box::pin(scan_result_stream),
+            terminated: false,
+            broadcast_sources,
+        }
+    }
+
+    /// Returns the broadcast source if the scanned peer is a broadcast source.
+    /// Returns an error if parsing of the scan result data fails and None if
+    /// the scanned peer is not a broadcast source.
+    fn try_into_broadcast_source(
+        scan_result: &ScanResult,
+    ) -> Result<Option<BroadcastSource>, PacketError> {
+        let mut source = None;
+        for datum in &scan_result.advertised {
+            let AdvertisingDatum::ServiceData(uuid, data) = datum else {
+                continue;
+            };
+            if *uuid == BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE {
+                let (bid, _) = BroadcastId::decode(data.as_slice())?;
+                source.get_or_insert(BroadcastSource::default()).with_broadcast_id(bid);
+            } else if *uuid == BASIC_AUDIO_ANNOUNCEMENT_SERVICE {
+                // TODO(dayeonglee): revisit when we implement periodic advertisement.
+                let (base, _) = BroadcastAudioSourceEndpoint::decode(data.as_slice())?;
+                source.get_or_insert(BroadcastSource::default()).with_endpoint(base);
+            }
+        }
+        Ok(source)
+    }
+}
+
+impl FusedStream for EventStream {
+    fn is_terminated(&self) -> bool {
+        self.terminated
+    }
+}
+
+impl Stream for EventStream {
+    type Item = Result<Event, Error>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.terminated {
+            return Poll::Ready(None);
+        }
+
+        // Poll scan result stream to check if there were any newly discovered peers
+        // that we're interested in.
+        match futures::ready!(self.scan_result_stream.poll_next_unpin(cx)) {
+            Some(Ok(scanned)) => {
+                match Self::try_into_broadcast_source(&scanned) {
+                    Err(e) => {
+                        return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData {
+                            peer: scanned.id,
+                            error: e,
+                        })));
+                    }
+                    Ok(Some(found_source)) => {
+                        // If we found a broadcast source, we add its information in the
+                        // internal records.
+                        let (broadcast_source, changed) = self
+                            .broadcast_sources
+                            .merge_broadcast_source_data(&scanned.id, &found_source);
+
+                        // Broadcast found event is relayed to the client iff complete
+                        // information has been gathered.
+                        if broadcast_source.into_add_source() && changed {
+                            return Poll::Ready(Some(Ok(Event::FoundBroadcastSource {
+                                peer: scanned.id,
+                                source: broadcast_source,
+                            })));
+                        }
+
+                        Poll::Pending
+                    }
+                    Ok(None) => Poll::Pending,
+                }
+            }
+            None | Some(Err(_)) => {
+                self.terminated = true;
+                Poll::Ready(Some(Err(Error::CentralScanTerminated)))
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use assert_matches::assert_matches;
+
+    use bt_common::core::{AddressType, AdvertisingSetId};
+    use bt_gatt::central::{AdvertisingDatum, PeerName};
+    use bt_gatt::test_utils::ScannedResultStream;
+    use bt_gatt::types::Error as BtGattError;
+    use bt_gatt::types::GattError;
+
+    fn setup_stream() -> (EventStream, ScannedResultStream) {
+        let fake_scan_result_stream = ScannedResultStream::new();
+        let broadcast_sources = DiscoveredBroadcastSources::new();
+
+        (
+            EventStream::new(fake_scan_result_stream.clone(), broadcast_sources),
+            fake_scan_result_stream,
+        )
+    }
+
+    #[test]
+    fn poll_found_broadcast_source_events() {
+        let (mut stream, mut scan_result_stream) = setup_stream();
+
+        // Scanned a broadcast source and its broadcast id.
+        let broadcast_source_pid = PeerId(1005);
+
+        scan_result_stream.set_scanned_result(Ok(ScanResult {
+            id: broadcast_source_pid,
+            connectable: true,
+            name: PeerName::Unknown,
+            advertised: vec![AdvertisingDatum::ServiceData(
+                BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE,
+                vec![0x01, 0x02, 0x03],
+            )],
+        }));
+
+        // Found broadcast source event shouldn't have been sent since braodcast source
+        // information isn't complete.
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+        // Pretend somehow address, address type, and advertising sid were
+        // filled out. This completes the broadcast source information.
+        // TODO(b/308481381): replace this block with sending a central scan result that
+        // contains the data.
+        let _ = stream.broadcast_sources.merge_broadcast_source_data(
+            &broadcast_source_pid,
+            &BroadcastSource::default()
+                .with_address([1, 2, 3, 4, 5, 6])
+                .with_address_type(AddressType::Public)
+                .with_advertising_sid(AdvertisingSetId(1)),
+        );
+
+        // Scanned broadcast source's BASE data.
+        // TODO(b/308481381): replace this block sending data through PA train instead.
+        #[rustfmt::skip]
+        let base_data = vec![
+            0x10, 0x20, 0x30, 0x02, // presentation delay, num of subgroups
+            0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #1)
+            0x00, // codec specific config len
+            0x00, // metadata len,
+            0x01, 0x00, // bis index, codec specific config len (big #1 / bis #1)
+            0x01, 0x02, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #2)
+            0x00, // codec specific config len
+            0x00, // metadata len,
+            0x01, 0x03, 0x02, 0x05,
+            0x08, /* bis index, codec specific config len, codec frame blocks LTV
+                    * (big #2 / bis #2) */
+        ];
+
+        scan_result_stream.set_scanned_result(Ok(ScanResult {
+            id: broadcast_source_pid,
+            connectable: true,
+            name: PeerName::Unknown,
+            advertised: vec![AdvertisingDatum::ServiceData(
+                BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
+                base_data.clone(),
+            )],
+        }));
+
+        // Expect the stream to send out broadcast source found event since information
+        // is complete.
+        let Poll::Ready(Some(Ok(event))) = stream.poll_next_unpin(&mut noop_cx) else {
+            panic!("should have received event");
+        };
+        assert_matches!(event, Event::FoundBroadcastSource{peer, ..} => {
+            assert_eq!(peer, broadcast_source_pid)
+        });
+
+        assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+        // Scanned the same broadcast source's BASE data.
+        scan_result_stream.set_scanned_result(Ok(ScanResult {
+            id: broadcast_source_pid,
+            connectable: true,
+            name: PeerName::Unknown,
+            advertised: vec![AdvertisingDatum::ServiceData(
+                BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
+                base_data.clone(),
+            )],
+        }));
+
+        // Shouldn't have gotten the event again since the information remained the
+        // same.
+        assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+    }
+
+    #[test]
+    fn central_scan_stream_terminates() {
+        let (mut stream, mut scan_result_stream) = setup_stream();
+
+        // Mimick scan error.
+        scan_result_stream.set_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu)));
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        match stream.poll_next_unpin(&mut noop_cx) {
+            Poll::Ready(Some(Err(e))) => assert_matches!(e, Error::CentralScanTerminated),
+            _ => panic!("should have received central scan terminated error"),
+        }
+
+        // Entire stream should have terminated.
+        assert_matches!(stream.poll_next_unpin(&mut noop_cx), Poll::Ready(None));
+        assert_matches!(stream.is_terminated(), true);
+    }
+}
diff --git a/rust/bt-broadcast-assistant/src/assistant/peer.rs b/rust/bt-broadcast-assistant/src/assistant/peer.rs
new file mode 100644
index 0000000..3369a4d
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/assistant/peer.rs
@@ -0,0 +1,294 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use futures::stream::FusedStream;
+use futures::Stream;
+use std::sync::Arc;
+use thiserror::Error;
+
+use bt_bass::client::error::Error as BassClientError;
+use bt_bass::client::event::Event as BassEvent;
+use bt_bass::client::{BigToBisSync, BroadcastAudioScanServiceClient};
+use bt_bass::types::{BroadcastId, PaSync};
+use bt_common::core::PaInterval;
+use bt_common::packet_encoding::Error as PacketError;
+use bt_common::PeerId;
+
+use crate::assistant::DiscoveredBroadcastSources;
+
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error("Failed to take event stream at Broadcast Audio Scan Service client")]
+    UnavailableBassEventStream,
+
+    #[error("Broadcast Audio Scan Service client error: {0:?}")]
+    BassClient(#[from] BassClientError),
+
+    #[error("Incomplete information for broadcast source with peer id ({0})")]
+    NotEnoughInfo(PeerId),
+
+    #[error("Broadcast source with peer id ({0}) does not exist")]
+    DoesNotExist(PeerId),
+
+    #[error("Packet error: {0}")]
+    PacketError(#[from] PacketError),
+}
+
+/// Connected scan delegator peer. Clients can use this
+/// object to perform Broadcast Audio Scan Service operations on the
+/// scan delegator peer.
+/// Not thread-safe and only one operation must be done at a time.
+pub struct Peer<T: bt_gatt::GattTypes> {
+    peer_id: PeerId,
+    // Keep for peer connection.
+    _client: T::Client,
+    bass: BroadcastAudioScanServiceClient<T>,
+    // TODO(b/309015071): add a field for pacs.
+    broadcast_sources: Arc<DiscoveredBroadcastSources>,
+}
+
+impl<T: bt_gatt::GattTypes> Peer<T> {
+    pub(crate) fn new(
+        peer_id: PeerId,
+        client: T::Client,
+        bass: BroadcastAudioScanServiceClient<T>,
+        broadcast_sources: Arc<DiscoveredBroadcastSources>,
+    ) -> Self {
+        Peer { peer_id, _client: client, bass, broadcast_sources }
+    }
+
+    pub fn peer_id(&self) -> PeerId {
+        self.peer_id
+    }
+
+    /// Takes event stream for BASS events from this scan delegator peer.
+    /// Clients can call this method to start subscribing to BASS events.
+    pub fn take_event_stream(
+        &mut self,
+    ) -> Result<impl Stream<Item = Result<BassEvent, BassClientError>> + FusedStream, Error> {
+        self.bass.take_event_stream().ok_or(Error::UnavailableBassEventStream)
+    }
+
+    /// Send broadcast code for a particular broadcast.
+    pub async fn send_broadcast_code(
+        &mut self,
+        broadcast_id: BroadcastId,
+        broadcast_code: [u8; 16],
+    ) -> Result<(), Error> {
+        self.bass.set_broadcast_code(broadcast_id, broadcast_code).await.map_err(Into::into)
+    }
+
+    /// Sends a command to add a particular broadcast source.
+    ///
+    /// # Arguments
+    ///
+    /// * `broadcast_source_pid` - peer id of the braodcast source that's to be
+    ///   added to this scan delegator peer
+    /// * `pa_sync` - pa sync mode the peer should attempt to be in
+    /// * `bis_sync` - desired BIG to BIS synchronization information. If the
+    ///   set is empty, no preference value is used for all the BIGs
+    pub async fn add_broadcast_source(
+        &mut self,
+        source_peer_id: PeerId,
+        pa_sync: PaSync,
+        bis_sync: BigToBisSync,
+    ) -> Result<(), Error> {
+        let broadcast_source = self
+            .broadcast_sources
+            .get_by_peer_id(&source_peer_id)
+            .ok_or(Error::DoesNotExist(source_peer_id))?;
+        if !broadcast_source.into_add_source() {
+            return Err(Error::NotEnoughInfo(source_peer_id));
+        }
+
+        self.bass
+            .add_broadcast_source(
+                broadcast_source.broadcast_id.unwrap(),
+                broadcast_source.address_type.unwrap(),
+                broadcast_source.address.unwrap(),
+                broadcast_source.advertising_sid.unwrap(),
+                pa_sync,
+                broadcast_source.pa_interval.unwrap_or(PaInterval::unknown()),
+                broadcast_source
+                    .endpoint
+                    .unwrap()
+                    .get_bass_subgroups(bis_sync)
+                    .map_err(Error::PacketError)?,
+            )
+            .await
+            .map_err(Into::into)
+    }
+
+    /// Sends a command to to update a particular broadcast source's PA sync.
+    ///
+    /// # Arguments
+    ///
+    /// * `broadcast_id` - broadcast id of the broadcast source that's to be
+    ///   updated
+    /// * `pa_sync` - pa sync mode the scan delegator peer should attempt to be
+    ///   in.
+    /// * `bis_sync` - desired BIG to BIS synchronization information
+    pub async fn update_broadcast_source_sync(
+        &mut self,
+        broadcast_id: BroadcastId,
+        pa_sync: PaSync,
+        bis_sync: BigToBisSync,
+    ) -> Result<(), Error> {
+        let pa_interval = self
+            .broadcast_sources
+            .get_by_broadcast_id(&broadcast_id)
+            .map(|bs| bs.pa_interval)
+            .unwrap_or(None);
+
+        self.bass
+            .modify_broadcast_source(broadcast_id, pa_sync, pa_interval, Some(bis_sync), None)
+            .await
+            .map_err(Into::into)
+    }
+
+    /// Sends a command to remove a particular broadcast source.
+    ///
+    /// # Arguments
+    ///
+    /// * `broadcast_id` - broadcast id of the braodcast source that's to be
+    ///   removed from the scan delegator
+    pub async fn remove_broadcast_source(
+        &mut self,
+        broadcast_id: BroadcastId,
+    ) -> Result<(), Error> {
+        self.bass.remove_broadcast_source(broadcast_id).await.map_err(Into::into)
+    }
+
+    /// Sends a command to inform the scan delegator peer that we have
+    /// started scanning for broadcast sources on behalf of it.
+    pub async fn inform_remote_scan_started(&mut self) -> Result<(), Error> {
+        self.bass.remote_scan_started().await.map_err(Into::into)
+    }
+
+    /// Sends a command to inform the scan delegator peer that we have
+    /// stopped scanning for broadcast sources on behalf of it.
+    pub async fn inform_remote_scan_stopped(&mut self) -> Result<(), Error> {
+        self.bass.remote_scan_stopped().await.map_err(Into::into)
+    }
+}
+
+#[cfg(test)]
+pub(crate) mod tests {
+    use super::*;
+
+    use assert_matches::assert_matches;
+    use futures::{pin_mut, FutureExt};
+    use std::collections::HashSet;
+    use std::task::Poll;
+
+    use bt_common::core::{AddressType, AdvertisingSetId};
+    use bt_gatt::test_utils::{FakeClient, FakePeerService, FakeTypes};
+    use bt_gatt::types::{
+        AttributePermissions, CharacteristicProperties, CharacteristicProperty, Handle,
+    };
+    use bt_gatt::Characteristic;
+
+    use crate::types::BroadcastSource;
+
+    const RECEIVE_STATE_HANDLE: Handle = Handle(0x11);
+    const AUDIO_SCAN_CONTROL_POINT_HANDLE: Handle = Handle(0x12);
+
+    pub(crate) fn fake_bass_service() -> FakePeerService {
+        let mut peer_service = FakePeerService::new();
+        // One broadcast receive state and one broadcast audio scan control
+        // point characteristic handles.
+        peer_service.add_characteristic(
+            Characteristic {
+                handle: RECEIVE_STATE_HANDLE,
+                uuid: bt_bass::types::BROADCAST_RECEIVE_STATE_UUID,
+                properties: CharacteristicProperties(vec![
+                    CharacteristicProperty::Broadcast,
+                    CharacteristicProperty::Notify,
+                ]),
+                permissions: AttributePermissions::default(),
+                descriptors: vec![],
+            },
+            vec![],
+        );
+        peer_service.add_characteristic(
+            Characteristic {
+                handle: AUDIO_SCAN_CONTROL_POINT_HANDLE,
+                uuid: bt_bass::types::BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
+                properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
+                permissions: AttributePermissions::default(),
+                descriptors: vec![],
+            },
+            vec![],
+        );
+        peer_service
+    }
+
+    fn setup() -> (Peer<FakeTypes>, FakePeerService, Arc<DiscoveredBroadcastSources>) {
+        let peer_service = fake_bass_service();
+
+        let broadcast_sources = DiscoveredBroadcastSources::new();
+        (
+            Peer {
+                peer_id: PeerId(0x1),
+                _client: FakeClient::new(),
+                bass: BroadcastAudioScanServiceClient::<FakeTypes>::create_for_test(
+                    peer_service.clone(),
+                    Handle(0x1),
+                ),
+                broadcast_sources: broadcast_sources.clone(),
+            },
+            peer_service,
+            broadcast_sources,
+        )
+    }
+
+    #[test]
+    fn take_event_stream() {
+        let (mut peer, _peer_service, _broadcast_source) = setup();
+        let _event_stream = peer.take_event_stream().expect("should succeed");
+
+        // If we try to take the event stream the second time, it should fail.
+        assert!(peer.take_event_stream().is_err());
+    }
+
+    #[test]
+    fn add_broadcast_source_fail() {
+        let (mut peer, _peer_service, broadcast_source) = setup();
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+        // Should fail because broadcast source doesn't exist.
+        {
+            let fut = peer.add_broadcast_source(
+                PeerId(1001),
+                PaSync::SyncPastUnavailable,
+                HashSet::new(),
+            );
+            pin_mut!(fut);
+            let polled = fut.poll_unpin(&mut noop_cx);
+            assert_matches!(polled, Poll::Ready(Err(_)));
+        }
+
+        let _ = broadcast_source.merge_broadcast_source_data(
+            &PeerId(1001),
+            &BroadcastSource::default()
+                .with_address([1, 2, 3, 4, 5, 6])
+                .with_address_type(AddressType::Public)
+                .with_advertising_sid(AdvertisingSetId(1))
+                .with_broadcast_id(BroadcastId(1001)),
+        );
+
+        // Should fail because not enough information.
+        {
+            let fut = peer.add_broadcast_source(
+                PeerId(1001),
+                PaSync::SyncPastUnavailable,
+                HashSet::new(),
+            );
+            pin_mut!(fut);
+            let polled = fut.poll_unpin(&mut noop_cx);
+            assert_matches!(polled, Poll::Ready(Err(_)));
+        }
+    }
+}
diff --git a/rust/bt-broadcast-assistant/src/lib.rs b/rust/bt-broadcast-assistant/src/lib.rs
new file mode 100644
index 0000000..7d7af50
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/lib.rs
@@ -0,0 +1,7 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+pub mod assistant;
+pub use assistant::BroadcastAssistant;
+pub mod types;
diff --git a/rust/bt-broadcast-assistant/src/types.rs b/rust/bt-broadcast-assistant/src/types.rs
new file mode 100644
index 0000000..8c6074e
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/types.rs
@@ -0,0 +1,435 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use bt_bass::client::BigToBisSync;
+use bt_bass::types::{BigSubgroup, BisSync, BroadcastId};
+use bt_common::core::ltv::LtValue;
+use bt_common::core::{Address, AddressType, CodecId};
+use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::generic_audio::codec_configuration::CodecConfiguration;
+use bt_common::generic_audio::metadata_ltv::Metadata;
+use bt_common::packet_encoding::{Decodable, Error as PacketError};
+
+/// Broadcast source data as advertised through Basic Audio Announcement
+/// PA and Broadcast Audio Announcement.
+/// See BAP spec v1.0.1 Section 3.7.2.1 and Section 3.7.2.2 for details.
+// TODO(b/308481381): fill out endpoint from basic audio announcement from PA trains.
+#[derive(Clone, Default, Debug, PartialEq)]
+pub struct BroadcastSource {
+    pub(crate) address: Option<Address>,
+    pub(crate) address_type: Option<AddressType>,
+    pub(crate) advertising_sid: Option<AdvertisingSetId>,
+    pub(crate) broadcast_id: Option<BroadcastId>,
+    pub(crate) pa_interval: Option<PaInterval>,
+    pub(crate) endpoint: Option<BroadcastAudioSourceEndpoint>,
+}
+
+impl BroadcastSource {
+    /// Returns whether or not this BroadcastSource has enough information
+    /// to be added by the Broadcast Assistant.
+    pub(crate) fn into_add_source(&self) -> bool {
+        // PA interval is not necessary since default value can be used.
+        self.address.is_some()
+            && self.address_type.is_some()
+            && self.advertising_sid.is_some()
+            && self.broadcast_id.is_some()
+            && self.endpoint.is_some()
+    }
+
+    pub fn with_address(&mut self, address: [u8; 6]) -> &mut Self {
+        self.address = Some(address);
+        self
+    }
+
+    pub fn with_address_type(&mut self, type_: AddressType) -> &mut Self {
+        self.address_type = Some(type_);
+        self
+    }
+
+    pub fn with_broadcast_id(&mut self, bid: BroadcastId) -> &mut Self {
+        self.broadcast_id = Some(bid);
+        self
+    }
+
+    pub fn with_advertising_sid(&mut self, sid: AdvertisingSetId) -> &mut Self {
+        self.advertising_sid = Some(sid);
+        self
+    }
+
+    pub fn with_endpoint(&mut self, endpoint: BroadcastAudioSourceEndpoint) -> &mut Self {
+        self.endpoint = Some(endpoint);
+        self
+    }
+
+    /// Merge fields from other broadcast source into this broadcast source.
+    /// Set fields in other source take priority over this source.
+    /// If a field in the other broadcast source is none, it's ignored and
+    /// the existing values are kept.
+    pub(crate) fn merge(&mut self, other: &BroadcastSource) {
+        if let Some(address) = other.address {
+            self.address = Some(address);
+        }
+        if let Some(address_type) = other.address_type {
+            self.address_type = Some(address_type);
+        }
+        if let Some(advertising_sid) = other.advertising_sid {
+            self.advertising_sid = Some(advertising_sid);
+        }
+        if let Some(broadcast_id) = other.broadcast_id {
+            self.broadcast_id = Some(broadcast_id);
+        }
+        if let Some(pa_interval) = other.pa_interval {
+            self.pa_interval = Some(pa_interval);
+        }
+        if let Some(endpoint) = &other.endpoint {
+            self.endpoint = Some(endpoint.clone());
+        }
+    }
+}
+
+/// Parameters exposed as part of Basic Audio Announcement from Broadcast
+/// Sources. See BAP spec v1.0.1 Section 3.7.2.2 for more details.
+// TODO(b/308481381): Fill out the struct.
+#[derive(Clone, Debug, PartialEq)]
+pub struct BroadcastAudioSourceEndpoint {
+    // Delay is 3 bytes.
+    pub(crate) presentation_delay_ms: u32,
+    pub(crate) big: Vec<BroadcastIsochronousGroup>,
+}
+
+impl BroadcastAudioSourceEndpoint {
+    // Should contain presentation delay, num BIG, and at least one BIG praram.
+    const MIN_PACKET_SIZE: usize = 3 + 1 + BroadcastIsochronousGroup::MIN_PACKET_SIZE;
+
+    /// Returns the representation of this object's broadcast isochronous groups
+    /// that's usable with Broadcast Audio Scan Service operations.
+    ///
+    /// # Arguments
+    ///
+    /// * `bis_sync` - BIG to BIS sync information. If the set is empty, no
+    ///   preference value is used for all the BIGs
+    pub(crate) fn get_bass_subgroups(
+        &self,
+        bis_sync: BigToBisSync,
+    ) -> Result<Vec<BigSubgroup>, PacketError> {
+        let mut subgroups = Vec::new();
+        let sync_map = bt_bass::client::big_to_bis_sync_indices(&bis_sync);
+
+        for (big_index, group) in self.big.iter().enumerate() {
+            let bis_sync = match sync_map.get(&(big_index as u8)) {
+                Some(bis_indices) => {
+                    let mut bis_sync: BisSync = BisSync(0);
+                    bis_sync.set_sync(bis_indices)?;
+                    bis_sync
+                }
+                _ => BisSync::default(),
+            };
+            subgroups.push(BigSubgroup::new(Some(bis_sync)).with_metadata(group.metadata.clone()));
+        }
+        Ok(subgroups)
+    }
+}
+
+impl Decodable for BroadcastAudioSourceEndpoint {
+    type Error = PacketError;
+
+    fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+        if buf.len() < Self::MIN_PACKET_SIZE {
+            return Err(PacketError::UnexpectedDataLength);
+        }
+
+        let mut idx = 0 as usize;
+        let presentation_delay = u32::from_le_bytes([buf[idx], buf[idx + 1], buf[idx + 2], 0x00]);
+        idx += 3;
+
+        let num_big: usize = buf[idx] as usize;
+        idx += 1;
+        if num_big < 1 {
+            return Err(PacketError::InvalidParameter(format!(
+                "num of subgroups shall be at least 1 got {num_big}"
+            )));
+        }
+
+        let mut big = Vec::new();
+        while big.len() < num_big {
+            let (group, len) = BroadcastIsochronousGroup::decode(&buf[idx..])
+                .map_err(|e| PacketError::InvalidParameter(format!("{e}")))?;
+            big.push(group);
+            idx += len;
+        }
+
+        Ok((Self { presentation_delay_ms: presentation_delay, big }, idx))
+    }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct BroadcastIsochronousGroup {
+    pub(crate) codec_id: CodecId,
+    pub(crate) codec_specific_config: Vec<CodecConfiguration>,
+    pub(crate) metadata: Vec<Metadata>,
+    pub(crate) bis: Vec<BroadcastIsochronousStream>,
+}
+
+impl BroadcastIsochronousGroup {
+    // Should contain num BIS, codec id, codec specific config len, metadata len,
+    // and at least one BIS praram.
+    const MIN_PACKET_SIZE: usize =
+        1 + CodecId::BYTE_SIZE + 1 + 1 + BroadcastIsochronousStream::MIN_PACKET_SIZE;
+}
+
+impl Decodable for BroadcastIsochronousGroup {
+    type Error = PacketError;
+
+    fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+        if buf.len() < BroadcastIsochronousGroup::MIN_PACKET_SIZE {
+            return Err(PacketError::UnexpectedDataLength);
+        }
+
+        let mut idx = 0;
+        let num_bis = buf[idx] as usize;
+        idx += 1;
+        if num_bis < 1 {
+            return Err(PacketError::InvalidParameter(format!(
+                "num of BIS shall be at least 1 got {num_bis}"
+            )));
+        }
+
+        let (codec_id, read_bytes) = CodecId::decode(&buf[idx..])?;
+        idx += read_bytes;
+
+        let codec_config_len = buf[idx] as usize;
+        idx += 1;
+
+        let (results, consumed) = CodecConfiguration::decode_all(&buf[idx..idx + codec_config_len]);
+        if consumed != codec_config_len {
+            return Err(bt_common::packet_encoding::Error::UnexpectedDataLength);
+        }
+
+        let codec_specific_configs = results.into_iter().filter_map(Result::ok).collect();
+        idx += codec_config_len;
+
+        let metadata_len = buf[idx] as usize;
+        idx += 1;
+
+        let (results_metadata, consumed_len) = Metadata::decode_all(&buf[idx..idx + metadata_len]);
+        if consumed_len != metadata_len {
+            return Err(PacketError::UnexpectedDataLength);
+        }
+        // Ignore any undecodable metadata types
+        let metadata = results_metadata.into_iter().filter_map(Result::ok).collect();
+        idx += consumed_len;
+
+        let mut bis = Vec::new();
+        while bis.len() < num_bis {
+            let (stream, len) = BroadcastIsochronousStream::decode(&buf[idx..])
+                .map_err(|e| PacketError::InvalidParameter(e.to_string()))?;
+            bis.push(stream);
+            idx += len;
+        }
+
+        Ok((
+            BroadcastIsochronousGroup {
+                codec_id,
+                codec_specific_config: codec_specific_configs,
+                metadata,
+                bis,
+            },
+            idx,
+        ))
+    }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct BroadcastIsochronousStream {
+    pub(crate) bis_index: u8,
+    pub(crate) codec_specific_config: Vec<CodecConfiguration>,
+}
+
+impl BroadcastIsochronousStream {
+    const MIN_PACKET_SIZE: usize = 1 + 1;
+}
+
+impl Decodable for BroadcastIsochronousStream {
+    type Error = PacketError;
+
+    fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+        if buf.len() < BroadcastIsochronousStream::MIN_PACKET_SIZE {
+            return Err(PacketError::UnexpectedDataLength);
+        }
+
+        let mut idx = 0;
+
+        let bis_index = buf[idx];
+        idx += 1;
+
+        let codec_config_len = buf[idx] as usize;
+        idx += 1;
+
+        let (results, consumed) = CodecConfiguration::decode_all(&buf[idx..idx + codec_config_len]);
+        if consumed != codec_config_len {
+            return Err(bt_common::packet_encoding::Error::UnexpectedDataLength);
+        }
+        let codec_specific_configs = results.into_iter().filter_map(Result::ok).collect();
+        idx += codec_config_len;
+
+        Ok((
+            BroadcastIsochronousStream { bis_index, codec_specific_config: codec_specific_configs },
+            idx,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashSet;
+
+    use super::*;
+
+    use bt_common::generic_audio::codec_configuration::{FrameDuration, SamplingFrequency};
+    use bt_common::generic_audio::AudioLocation;
+
+    #[test]
+    fn decode_bis() {
+        #[rustfmt::skip]
+        let buf = [
+            0x01, 0x09,                          // bis index and codec specific config len
+            0x02, 0x01, 0x06,                    // sampling frequency LTV
+            0x05, 0x03, 0x03, 0x00, 0x00, 0x0C,  // audio location LTV
+        ];
+
+        let (bis, _read_bytes) =
+            BroadcastIsochronousStream::decode(&buf[..]).expect("should not fail");
+        assert_eq!(
+            bis,
+            BroadcastIsochronousStream {
+                bis_index: 0x01,
+                codec_specific_config: vec![
+                    CodecConfiguration::SamplingFrequency(SamplingFrequency::F32000Hz),
+                    CodecConfiguration::AudioChannelAllocation(HashSet::from([
+                        AudioLocation::FrontLeft,
+                        AudioLocation::FrontRight,
+                        AudioLocation::LeftSurround,
+                        AudioLocation::RightSurround
+                    ])),
+                ],
+            }
+        );
+    }
+
+    #[test]
+    fn decode_big() {
+        #[rustfmt::skip]
+        let buf = [
+            0x02,  0x03, 0x00, 0x00, 0x00, 0x00,  // num of bis, codec id
+            0x04, 0x03, 0x04, 0x04, 0x10,         // codec specific config len, octets per codec frame LTV
+            0x03, 0x02, 0x08, 0x01,               // metadata len, audio active state LTV
+            0x01, 0x00,                           // bis index, codec specific config len (bis #1)
+            0x02, 0x03, 0x02, 0x02, 0x01,         // bis index, codec specific config len, frame duration LTV (bis #2)
+        ];
+
+        let (big, _read_bytes) =
+            BroadcastIsochronousGroup::decode(&buf[..]).expect("should not fail");
+        assert_eq!(
+            big,
+            BroadcastIsochronousGroup {
+                codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Transparent),
+                codec_specific_config: vec![CodecConfiguration::OctetsPerCodecFrame(0x1004),],
+                metadata: vec![Metadata::AudioActiveState(true)],
+                bis: vec![
+                    BroadcastIsochronousStream { bis_index: 0x01, codec_specific_config: vec![] },
+                    BroadcastIsochronousStream {
+                        bis_index: 0x02,
+                        codec_specific_config: vec![CodecConfiguration::FrameDuration(
+                            FrameDuration::TenMs
+                        )],
+                    },
+                ],
+            }
+        );
+    }
+
+    #[test]
+    fn decode_base() {
+        #[rustfmt::skip]
+        let buf = [
+            0x10, 0x20, 0x30, 0x02,               // presentation delay, num of subgroups
+            0x01, 0x03, 0x00, 0x00, 0x00, 0x00,   // num of bis, codec id (big #1)
+            0x00,                                 // codec specific config len
+            0x00,                                 // metadata len,
+            0x01, 0x00,                           // bis index, codec specific config len (big #1 / bis #1)
+            0x01, 0x02, 0x00, 0x00, 0x00, 0x00,   // num of bis, codec id (big #2)
+            0x00,                                 // codec specific config len
+            0x00,                                 // metadata len,
+            0x01, 0x03, 0x02, 0x05, 0x08,         // bis index, codec specific config len, codec frame blocks LTV (big #2 / bis #2)
+        ];
+
+        let (base, _read_bytes) =
+            BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail");
+        assert_eq!(base.presentation_delay_ms, 0x00302010);
+        assert_eq!(base.big.len(), 2);
+        assert_eq!(
+            base.big[0],
+            BroadcastIsochronousGroup {
+                codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Transparent),
+                codec_specific_config: vec![],
+                metadata: vec![],
+                bis: vec![BroadcastIsochronousStream {
+                    bis_index: 0x01,
+                    codec_specific_config: vec![],
+                },],
+            }
+        );
+        assert_eq!(
+            base.big[1],
+            BroadcastIsochronousGroup {
+                codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Cvsd),
+                codec_specific_config: vec![],
+                metadata: vec![],
+                bis: vec![BroadcastIsochronousStream {
+                    bis_index: 0x01,
+                    codec_specific_config: vec![CodecConfiguration::CodecFramesPerSdu(0x08)],
+                },],
+            }
+        );
+    }
+
+    #[test]
+    fn decode_base_complex() {
+        // BroadcastAudioSourceEndpoint { presentation_delay_ms: 20000, big:
+        // [BroadcastIsochronousGroup { codec_id: Assigned(Lc3), codec_specific_config:
+        // [SamplingFrequency(F24000Hz), FrameDuration(TenMs),
+        // AudioChannelAllocation({FrontLeft, FrontRight}), OctetsPerCodecFrame(60)],
+        // metadata: [StreamingAudioContexts([Media])], bis:
+        // [BroadcastIsochronousStream { bis_index: 1, codec_specific_config:
+        // [AudioChannelAllocation({FrontLeft})] }, BroadcastIsochronousStream {
+        // bis_index: 2, codec_specific_con$
+        // ig: [AudioChannelAllocation({FrontRight})] }] }] }
+        #[rustfmt::skip]
+        let buf = [
+            0x20, 0x4e, 0x00, 0x01, 0x02, 0x06, 0x00, 0x00, 0x00, 0x00, 0x10, 0x02, 0x01, 0x05, 0x02, 0x02, 0x01, 0x05,
+            0x03, 0x03, 0x00, 0x00, 0x00, 0x03, 0x04, 0x3c, 0x00, 0x04, 0x03, 0x02, 0x04, 0x00, 0x01, 0x06, 0x05, 0x03, 0x01,
+            0x00, 0x00, 0x00, 0x02, 0x06, 0x05, 0x03, 0x02, 0x00, 0x00, 0x00
+        ];
+        let (base, _read_bytes) =
+            BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail");
+        println!("{base:?}");
+
+        // BroadcastAudioSourceEndpoint { presentation_delay_ms: 20000, big:
+        // [BroadcastIsochronousGroup { codec_id: Assigned(Lc3), codec_specific_config:
+        // [SamplingFrequency(F24000Hz), FrameDuration(TenMs),
+        // AudioChannelAllocation({FrontLeft}), OctetsPerCodecFrame(60)],
+        // metadata: [BroadcastAudioImmediateRenderingFlag], bis:
+        // [BroadcastIsochronousStream { bis_index: 1, codec_specific_config:
+        // [SamplingFrequency(F24000Hz)] }] }] }
+        #[rustfmt::skip]
+        let buf = [
+            0x20, 0x4e, 0x00, 0x01, 0x01, 0x06, 0x00, 0x00, 0x00, 0x00, 0x10, 0x02, 0x01, 0x05,
+            0x02, 0x02, 0x01, 0x05, 0x03, 0x01, 0x00, 0x00, 0x00, 0x03, 0x04, 0x3c, 0x00, 0x02,
+            0x01, 0x09, 0x01, 0x03, 0x02, 0x01, 0x05,
+        ];
+        let (base, _read_bytes) =
+            BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail");
+        println!("{base:?}");
+    }
+}
diff --git a/rust/bt-common/src/core.rs b/rust/bt-common/src/core.rs
index fd02299..9ee3fa0 100644
--- a/rust/bt-common/src/core.rs
+++ b/rust/bt-common/src/core.rs
@@ -7,8 +7,36 @@
 
 use crate::packet_encoding::{Encodable, Error as PacketError};
 
-/// Represents the Advertising Set ID which is 1 byte long.
-/// Follows little endian encoding.
+/// Bluetooth Device Address that uniquely identifies the device
+/// to another Bluetooth device.
+/// See Core spec v5.3 Vol 2, Part B section 1.2.
+pub type Address = [u8; 6];
+
+/// See Core spec v5.3 Vol 3, Part C section 15.1.1.
+#[repr(u8)]
+#[derive(Clone, Copy, Debug, PartialEq)]
+pub enum AddressType {
+    Public = 0x00,
+    Random = 0x01,
+}
+
+impl AddressType {
+    pub const BYTE_SIZE: usize = 1;
+}
+
+impl TryFrom<u8> for AddressType {
+    type Error = PacketError;
+
+    fn try_from(value: u8) -> Result<Self, Self::Error> {
+        match value {
+            0x00 => Ok(Self::Public),
+            0x01 => Ok(Self::Random),
+            _ => Err(PacketError::OutOfRange),
+        }
+    }
+}
+
+/// Advertising Set ID which is 1 byte long.
 #[derive(Debug, Clone, Copy, PartialEq)]
 pub struct AdvertisingSetId(pub u8);
 
@@ -17,8 +45,7 @@
     pub const BYTE_SIZE: usize = 1;
 }
 
-/// Represents the SyncInfo Interval value which is 2 bytes long.
-/// Follows little endian encoding.
+/// SyncInfo Interval value which is 2 bytes long.
 #[derive(Debug, Clone, Copy, PartialEq)]
 pub struct PaInterval(pub u16);
 
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index 879a0fe..7a3578f 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -12,7 +12,7 @@
 
 use bt_common::{PeerId, Uuid};
 
-use crate::central::{AdvertisingDatum, PeerName, ScanResult};
+use crate::central::ScanResult;
 use crate::client::CharacteristicNotification;
 use crate::{types::*, GattTypes};
 
@@ -142,47 +142,97 @@
     }
 }
 
-pub struct FakeServiceHandle {}
+#[derive(Clone)]
+pub struct FakeServiceHandle {
+    pub uuid: Uuid,
+    pub is_primary: bool,
+    pub fake_service: FakePeerService,
+}
 
 impl crate::client::PeerServiceHandle<FakeTypes> for FakeServiceHandle {
     fn uuid(&self) -> Uuid {
-        todo!()
+        self.uuid
     }
 
     fn is_primary(&self) -> bool {
-        todo!()
+        self.is_primary
     }
 
     fn connect(&self) -> <FakeTypes as GattTypes>::ServiceConnectFut {
-        futures::future::ready(Ok(FakePeerService::new()))
+        futures::future::ready(Ok(self.fake_service.clone()))
     }
 }
 
-pub struct FakeClient {}
+#[derive(Default)]
+struct FakeClientInner {
+    fake_services: Vec<FakeServiceHandle>,
+}
+
+#[derive(Clone)]
+pub struct FakeClient {
+    inner: Arc<Mutex<FakeClientInner>>,
+}
+
+impl FakeClient {
+    pub fn new() -> Self {
+        FakeClient { inner: Arc::new(Mutex::new(FakeClientInner::default())) }
+    }
+
+    /// Add a fake peer service to this client.
+    pub fn add_service(&mut self, uuid: Uuid, is_primary: bool, fake_service: FakePeerService) {
+        self.inner.lock().fake_services.push(FakeServiceHandle { uuid, is_primary, fake_service });
+    }
+}
 
 impl crate::Client<FakeTypes> for FakeClient {
     fn peer_id(&self) -> PeerId {
         todo!()
     }
 
-    fn find_service(&self, _uuid: Uuid) -> <FakeTypes as GattTypes>::FindServicesFut {
-        futures::future::ready(Ok::<Vec<FakeServiceHandle>, Error>(vec![FakeServiceHandle {}]))
+    fn find_service(&self, uuid: Uuid) -> <FakeTypes as GattTypes>::FindServicesFut {
+        let fake_services = &self.inner.lock().fake_services;
+        let mut filtered_services = Vec::new();
+        for handle in fake_services {
+            if handle.uuid == uuid {
+                filtered_services.push(handle.clone());
+            }
+        }
+
+        futures::future::ready(Ok(filtered_services))
     }
 }
 
-pub struct SingleResultStream {
-    result: Option<Result<crate::central::ScanResult>>,
+#[derive(Clone)]
+pub struct ScannedResultStream {
+    inner: Arc<Mutex<ScannedResultStreamInner>>,
 }
 
-impl Stream for SingleResultStream {
+#[derive(Default)]
+pub struct ScannedResultStreamInner {
+    result: Option<Result<ScanResult>>,
+}
+
+impl ScannedResultStream {
+    pub fn new() -> Self {
+        Self { inner: Arc::new(Mutex::new(ScannedResultStreamInner::default())) }
+    }
+
+    /// Set scanned result item to output from the stream.
+    pub fn set_scanned_result(&mut self, item: Result<ScanResult>) {
+        self.inner.lock().result = Some(item);
+    }
+}
+
+impl Stream for ScannedResultStream {
     type Item = Result<ScanResult>;
 
     fn poll_next(
         self: std::pin::Pin<&mut Self>,
         _cx: &mut std::task::Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        if self.result.is_some() {
-            Poll::Ready(self.get_mut().result.take())
+        let mut lock = self.inner.lock();
+        if lock.result.is_some() {
+            Poll::Ready(lock.result.take())
         } else {
             // Never wake up, as if we never find another result
             Poll::Pending
@@ -194,7 +244,7 @@
 
 impl GattTypes for FakeTypes {
     type Central = FakeCentral;
-    type ScanResultStream = SingleResultStream;
+    type ScanResultStream = ScannedResultStream;
     type Client = FakeClient;
     type ConnectFuture = Ready<Result<FakeClient>>;
     type PeerServiceHandle = FakeServiceHandle;
@@ -208,21 +258,36 @@
 }
 
 #[derive(Default)]
-pub struct FakeCentral {}
+pub struct FakeCentralInner {
+    clients: HashMap<PeerId, FakeClient>,
+}
 
-impl crate::Central<FakeTypes> for FakeCentral {
-    fn scan(&self, _filters: &[crate::central::ScanFilter]) -> SingleResultStream {
-        SingleResultStream {
-            result: Some(Ok(ScanResult {
-                id: PeerId(1),
-                connectable: true,
-                name: PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
-                advertised: vec![AdvertisingDatum::Services(vec![Uuid::from_u16(0x1844)])],
-            })),
-        }
+#[derive(Clone)]
+pub struct FakeCentral {
+    inner: Arc<Mutex<FakeCentralInner>>,
+}
+
+impl FakeCentral {
+    pub fn new() -> Self {
+        Self { inner: Arc::new(Mutex::new(FakeCentralInner::default())) }
     }
 
-    fn connect(&self, _peer_id: PeerId) -> <FakeTypes as GattTypes>::ConnectFuture {
-        futures::future::ready(Ok(FakeClient {}))
+    pub fn add_client(&mut self, peer_id: PeerId, client: FakeClient) {
+        let _ = self.inner.lock().clients.insert(peer_id, client);
+    }
+}
+
+impl crate::Central<FakeTypes> for FakeCentral {
+    fn scan(&self, _filters: &[crate::central::ScanFilter]) -> ScannedResultStream {
+        ScannedResultStream::new()
+    }
+
+    fn connect(&self, peer_id: PeerId) -> <FakeTypes as GattTypes>::ConnectFuture {
+        let clients = &self.inner.lock().clients;
+        let res = match clients.get(&peer_id) {
+            Some(client) => Ok(client.clone()),
+            None => Err(Error::PeerDisconnected(peer_id)),
+        };
+        futures::future::ready(res)
     }
 }
diff --git a/rust/bt-gatt/src/tests.rs b/rust/bt-gatt/src/tests.rs
index d300f0f..e753706 100644
--- a/rust/bt-gatt/src/tests.rs
+++ b/rust/bt-gatt/src/tests.rs
@@ -9,6 +9,7 @@
 
 use bt_common::{PeerId, Uuid};
 
+use crate::central::{AdvertisingDatum, PeerName, ScanResult};
 use crate::test_utils::*;
 use crate::types::*;
 use crate::{central::Filter, client::PeerService, Central};
@@ -18,7 +19,7 @@
 const TEST_UUID_3: Uuid = Uuid::from_u16(0x3456);
 
 // Sets up a fake peer service with some characteristics.
-fn set_up() -> FakePeerService {
+fn setup_peer_service() -> FakePeerService {
     let mut fake_peer_service = FakePeerService::new();
     fake_peer_service.add_characteristic(
         Characteristic {
@@ -86,7 +87,7 @@
 fn peer_service_discover_characteristics_works() {
     let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
 
-    let fake_peer_service = set_up();
+    let fake_peer_service = setup_peer_service();
 
     let mut discover_results = fake_peer_service.discover_characteristics(Some(TEST_UUID_1));
     let polled = discover_results.poll_unpin(&mut noop_cx);
@@ -110,7 +111,7 @@
 fn peer_service_read_characteristic() {
     let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
 
-    let mut fake_peer_service = set_up();
+    let mut fake_peer_service = setup_peer_service();
     let mut buf = vec![0; 255];
 
     // For characteristic that was added, value is returned
@@ -152,7 +153,7 @@
 fn peer_service_write_characteristic() {
     let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
 
-    let mut fake_peer_service: FakePeerService = set_up();
+    let mut fake_peer_service: FakePeerService = setup_peer_service();
 
     // Set expected characteristic value before calling `write_characteristic`.
     fake_peer_service.expect_characteristic_value(&Handle(1), vec![0, 1, 2, 3]);
@@ -174,7 +175,7 @@
 fn peer_service_write_characteristic_fail() {
     let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
 
-    let fake_peer_service = set_up();
+    let fake_peer_service = setup_peer_service();
 
     // Write some random value without calling `expect_characteristic_value` first.
     let mut write_result = fake_peer_service.write_characteristic(
@@ -194,7 +195,7 @@
 fn peer_service_subsribe() {
     let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
 
-    let mut fake_peer_service = set_up();
+    let mut fake_peer_service = setup_peer_service();
     let mut notification_stream = fake_peer_service.subscribe(&Handle(0x1));
 
     // Stream is empty unless we add an item through the FakeNotificationStream
@@ -230,9 +231,19 @@
 #[test]
 fn central_search_works() {
     let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
-    let central = FakeCentral::default();
+    let central = FakeCentral::new();
 
     let mut scan_results = central.scan(&[Filter::ServiceUuid(Uuid::from_u16(0x1844)).into()]);
+    let polled = scan_results.poll_next_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Pending);
+
+    let scanned_result = ScanResult {
+        id: PeerId(1),
+        connectable: true,
+        name: PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
+        advertised: vec![AdvertisingDatum::Services(vec![Uuid::from_u16(0x1844)])],
+    };
+    let _ = scan_results.set_scanned_result(Ok(scanned_result));
 
     let polled = scan_results.poll_next_unpin(&mut noop_cx);
     assert_matches!(polled, Poll::Ready(Some(Ok(_))));
@@ -243,8 +254,8 @@
 
     let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
     let _stream = central.scan(&[]);
-    let connect_fut = central.connect(PeerId(1));
 
+    let connect_fut = central.connect(PeerId(1));
     futures::pin_mut!(connect_fut);
     let Poll::Ready(Ok(client)) = connect_fut.poll(&mut noop_cx) else {
         panic!("Connect should be ready Ok");
@@ -273,7 +284,12 @@
 
 #[test]
 fn central_dynamic_usage() {
-    let central: Box<dyn crate::Central<FakeTypes>> = Box::new(FakeCentral::default());
+    let mut central = FakeCentral::new();
+    let mut fake_client = FakeClient::new();
+    fake_client.add_service(Uuid::from_u16(0), true, FakePeerService::new());
+    central.add_client(PeerId(1), fake_client);
 
-    boxed_generic_usage(central);
+    let boxed: Box<dyn crate::Central<FakeTypes>> = Box::new(central);
+
+    boxed_generic_usage(boxed);
 }