rust/bt-{bass, common}: Implement all operations

Define types for representing a broadcast source. This makes it easier
to collect information about a broadcast source when broadcast assistant
is processing LE central scan results.
Implement all operations available with the Broadcast Audio Scan Service
client:
 - adding/modifying/deleting broadcast sources
 - marking scanning as started/stopped

Bug: b/308483171
Change-Id: I9f3a045e84efaa540c38e6f4753ece2c9c3796ae
Reviewed-on: https://bluetooth-review.git.corp.google.com/c/bluetooth/+/1322
Reviewed-by: Marie Janssen <jamuraa@google.com>
diff --git a/rust/bt-bass/Cargo.toml b/rust/bt-bass/Cargo.toml
index 81e18a2..2240624 100644
--- a/rust/bt-bass/Cargo.toml
+++ b/rust/bt-bass/Cargo.toml
@@ -4,10 +4,18 @@
 edition.workspace = true
 license.workspace = true
 
+[features]
+default = []
+test-utils = []
+
 [dependencies]
 bt-common.workspace = true
-bt-gatt = { workspace = true , features = ["test-utils"] }
+bt-gatt = { workspace = true, features = ["test-utils"] }
 futures.workspace = true
 parking_lot.workspace = true
 thiserror.workspace = true
 tracing.workspace = true
+
+[dev-dependencies]
+assert_matches.workspace = true
+bt-bass = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
index 959e913..3aa0b18 100644
--- a/rust/bt-bass/src/client.rs
+++ b/rust/bt-bass/src/client.rs
@@ -5,14 +5,17 @@
 pub mod error;
 pub mod event;
 
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
-use futures::stream::{BoxStream, SelectAll, StreamExt};
+use futures::stream::{BoxStream, FusedStream, SelectAll, Stream, StreamExt};
+use futures::Future;
 use parking_lot::Mutex;
 use tracing::warn;
 
-use bt_common::packet_encoding::{Decodable, Encodable};
+use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::generic_audio::metadata_ltv::Metadata;
+use bt_common::packet_encoding::Decodable;
 use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic};
 use bt_gatt::types::{Handle, WriteMode};
 
@@ -22,6 +25,22 @@
 
 const READ_CHARACTERISTIC_BUFFER_SIZE: usize = 255;
 
+/// Index into the vector of BIG subgroups. Valid value range is [0 to len of
+/// BIG vector).
+pub type SubgroupIndex = u8;
+
+/// Synchronization information for BIG groups to tell which
+/// BISes it should sync to.
+pub type BigToBisSync = HashSet<(SubgroupIndex, BisIndex)>;
+
+pub fn big_to_bis_sync_indices(info: &BigToBisSync) -> HashMap<SubgroupIndex, Vec<BisIndex>> {
+    let mut sync_map = HashMap::new();
+    for (ith_group, bis_index) in info.iter() {
+        sync_map.entry(*ith_group).or_insert(Vec::new()).push(*bis_index);
+    }
+    sync_map
+}
+
 /// Keeps track of Source_ID and Broadcast_ID that are associated together.
 /// Source_ID is assigned by the BASS server to a Broadcast Receive State
 /// characteristic. If the remote peer with the BASS server autonomously
@@ -31,40 +50,62 @@
 /// is unqiue to BASS, we track the Broadcast_ID that a Source_ID is associated
 /// so that it can be used by upper layers.
 #[derive(Default)]
-pub struct BroadcastSourceIdTracker {
-    source_to_broadcast: HashMap<SourceId, BroadcastId>,
-}
+pub(crate) struct KnownBroadcastSources(HashMap<Handle, BroadcastReceiveState>);
 
-impl BroadcastSourceIdTracker {
-    fn new() -> Self {
-        Self::default()
+impl KnownBroadcastSources {
+    fn new(receive_states: HashMap<Handle, BroadcastReceiveState>) -> Self {
+        KnownBroadcastSources(receive_states)
     }
 
-    /// Updates the broadcast ID associated with the given source ID and returns
-    /// the previously-associated broadcast ID if it exists.
-    fn update(&mut self, source_id: SourceId, broadcast_id: BroadcastId) -> Option<BroadcastId> {
-        self.source_to_broadcast.insert(source_id, broadcast_id)
+    /// Updates the value of the specified broadcast receive state
+    /// characteristic. Returns the old value if it existed.
+    fn update_state(
+        &mut self,
+        key: Handle,
+        value: BroadcastReceiveState,
+    ) -> Option<BroadcastReceiveState> {
+        self.0.insert(key, value)
     }
 
+    /// Given the broadcast ID, find the corresponding source ID.
+    /// Returns none if the server doesn't know the specified broadcast source
+    /// because a) the broadcast source was never added or discovered; or,
+    /// b) the broadcast source was removed from remove operation; or,
+    /// c) the broadcast source was removed by the server to add a different
+    ///    broadcast source.
     fn source_id(&self, broadcast_id: &BroadcastId) -> Option<SourceId> {
-        self.source_to_broadcast
-            .iter()
-            .find_map(|(sid, bid)| (*bid == *broadcast_id).then_some(*sid))
+        let Some(state) = self.state(broadcast_id) else {
+            return None;
+        };
+        Some(state.source_id)
+    }
+
+    /// Gets the last updated broadcast receive state value.
+    /// Returns none if the server doesn't know the specified broadcast source.
+    fn state(&self, broadcast_id: &BroadcastId) -> Option<&ReceiveState> {
+        self.0.iter().find_map(|(&_k, &ref v)| match v {
+            BroadcastReceiveState::Empty => None,
+            BroadcastReceiveState::NonEmpty(rs) => {
+                if rs.broadcast_id() == *broadcast_id {
+                    return Some(rs);
+                }
+                None
+            }
+        })
     }
 }
 
 /// Manages connection to the Broadcast Audio Scan Service at the
 /// remote Scan Delegator and writes/reads characteristics to/from it.
 pub struct BroadcastAudioScanServiceClient<T: bt_gatt::GattTypes> {
-    gatt_client: Box<T::PeerService>,
-    id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
+    gatt_client: T::PeerService,
     /// Broadcast Audio Scan Service only has one Broadcast Audio Scan Control
     /// Point characteristic according to BASS Section 3. There shall
     /// be one or more Broadcast Receive State characteristics.
     audio_scan_control_point: Handle,
     /// Broadcast Receive State characteristics can be used to determine the
     /// BASS status.
-    receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+    broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
     /// Keeps track of the broadcast codes that were sent to the remote BASS
     /// server.
     broadcast_codes: HashMap<SourceId, [u8; 16]>,
@@ -75,7 +116,18 @@
 }
 
 impl<T: bt_gatt::GattTypes> BroadcastAudioScanServiceClient<T> {
-    async fn create(gatt_client: T::PeerService) -> Result<Self, Error>
+    #[cfg(any(test, feature = "test-utils"))]
+    pub fn create_for_test(gatt_client: T::PeerService, audio_scan_control_point: Handle) -> Self {
+        Self {
+            gatt_client,
+            audio_scan_control_point,
+            broadcast_sources: Default::default(),
+            broadcast_codes: HashMap::new(),
+            notification_streams: None,
+        }
+    }
+
+    pub async fn create(gatt_client: T::PeerService) -> Result<Self, Error>
     where
         <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
     {
@@ -95,25 +147,15 @@
         }
         let bascp_handle = *bascp[0].handle();
         let brs_chars = Self::discover_brs_characteristics(&gatt_client).await?;
-        let mut id_tracker = BroadcastSourceIdTracker::new();
-        for c in brs_chars.values() {
-            if let Some(read_value) = c {
-                if let BroadcastReceiveState::NonEmpty(state) = read_value {
-                    let _ = id_tracker.update(state.source_id(), state.broadcast_id());
-                }
-            }
-        }
-
-        let mut client = Self {
-            gatt_client: Box::new(gatt_client),
-            id_tracker: Arc::new(Mutex::new(id_tracker)),
+        let mut c = Self {
+            gatt_client,
             audio_scan_control_point: bascp_handle,
-            receive_states: Arc::new(Mutex::new(brs_chars)),
+            broadcast_sources: Arc::new(Mutex::new(KnownBroadcastSources::new(brs_chars))),
             broadcast_codes: HashMap::new(),
             notification_streams: None,
         };
-        client.register_notifications();
-        Ok(client)
+        c.register_notifications();
+        Ok(c)
     }
 
     // Discover all the Broadcast Receive State characteristics.
@@ -121,7 +163,7 @@
     // Characteristics.
     async fn discover_brs_characteristics(
         gatt_client: &T::PeerService,
-    ) -> Result<HashMap<Handle, Option<BroadcastReceiveState>>, Error> {
+    ) -> Result<HashMap<Handle, BroadcastReceiveState>, Error> {
         let brs = ServiceCharacteristic::<T>::find(gatt_client, BROADCAST_RECEIVE_STATE_UUID)
             .await
             .map_err(|e| Error::Gatt(e))?;
@@ -136,7 +178,7 @@
             match c.read(&mut buf[..]).await {
                 Ok(read_bytes) => match BroadcastReceiveState::decode(&buf[0..read_bytes]) {
                     Ok((decoded, _decoded_bytes)) => {
-                        brs_map.insert(*c.handle(), Some(decoded));
+                        brs_map.insert(*c.handle(), decoded);
                         continue;
                     }
                     Err(e) => warn!(
@@ -147,7 +189,7 @@
                 },
                 Err(e) => warn!("Failed to read characteristic ({:?}) value: {:?}", *c.handle(), e),
             }
-            brs_map.insert(*c.handle(), None);
+            brs_map.insert(*c.handle(), BroadcastReceiveState::Empty);
         }
         Ok(brs_map)
     }
@@ -158,8 +200,8 @@
     {
         let mut notification_streams = SelectAll::new();
         {
-            let lock = self.receive_states.lock();
-            for handle in lock.keys() {
+            let lock = self.broadcast_sources.lock();
+            for handle in lock.0.keys() {
                 let stream = self.gatt_client.subscribe(&handle);
                 notification_streams.push(stream.boxed());
             }
@@ -173,44 +215,198 @@
     /// notification that are processed by BroadcastAudioScanServiceClient.
     /// This method should only be called once.
     /// Returns an error if the method is called for a second time.
-    pub fn take_event_stream(&mut self) -> Option<BroadcastAudioScanServiceEventStream> {
+    pub fn take_event_stream(
+        &mut self,
+    ) -> Option<impl Stream<Item = Result<Event, Error>> + FusedStream> {
         let notification_streams = self.notification_streams.take();
         let Some(streams) = notification_streams else {
             return None;
         };
-        let event_stream = BroadcastAudioScanServiceEventStream::new(
-            streams,
-            self.id_tracker.clone(),
-            self.receive_states.clone(),
-        );
+        let event_stream = EventStream::new(streams, self.broadcast_sources.clone());
         Some(event_stream)
     }
 
+    /// Write to the Broadcast Audio Scan Control Point characteristic in
+    /// without response mode.
+    fn write_to_bascp(
+        &self,
+        op: impl ControlPointOperation,
+    ) -> impl Future<Output = Result<(), Error>> + '_ {
+        let handle = self.audio_scan_control_point;
+        let mut buf = vec![0; op.encoded_len()];
+        let encode_res = op.encode(&mut buf[..]);
+        async move {
+            match encode_res {
+                Err(e) => Err(Error::Packet(e)),
+                Ok(_) => self
+                    .gatt_client
+                    .write_characteristic(&handle, WriteMode::WithoutResponse, 0, buf.as_slice())
+                    .await
+                    .map_err(|e| Error::Gatt(e)),
+            }
+        }
+    }
+
+    fn get_source_id(&self, broadcast_id: &BroadcastId) -> Result<SourceId, Error> {
+        self.broadcast_sources
+            .lock()
+            .source_id(broadcast_id)
+            .ok_or(Error::UnknownBroadcastSource(*broadcast_id))
+    }
+
+    /// Returns a clone of the latest known broadcast audio receive state of the
+    /// specified broadcast source given its broadcast id.
+    fn get_broadcast_source_state(&self, broadcast_id: &BroadcastId) -> Option<ReceiveState> {
+        let lock = self.broadcast_sources.lock();
+        lock.state(broadcast_id).clone().map(|rs| rs.clone())
+    }
+
+    /// Indicates to the remote BASS server that we have started scanning for
+    /// broadcast sources on behalf of it. If the scan delegator that serves
+    /// the BASS server is collocated with a broadcast sink, this may or may
+    /// not change the scanning behaviour of the the broadcast sink.
+    pub async fn remote_scan_started(&mut self) -> Result<(), Error> {
+        let op = RemoteScanStartedOperation;
+        self.write_to_bascp(op).await
+    }
+
+    /// Indicates to the remote BASS server that we have stopped scanning for
+    /// broadcast sources on behalf of it.
+    pub async fn remote_scan_stopped(&mut self) -> Result<(), Error> {
+        let op = RemoteScanStoppedOperation;
+        self.write_to_bascp(op).await
+    }
+
+    /// Provides the BASS server with information regarding a Broadcast Source.
+    pub async fn add_broadcast_source(
+        &mut self,
+        broadcast_id: BroadcastId,
+        address_type: AddressType,
+        advertiser_address: [u8; ADDRESS_BYTE_SIZE],
+        sid: AdvertisingSetId,
+        pa_sync: PaSync,
+        pa_interval: PaInterval,
+        subgroups: Vec<BigSubgroup>,
+    ) -> Result<(), Error> {
+        let op = AddSourceOperation::new(
+            address_type,
+            advertiser_address,
+            sid,
+            broadcast_id,
+            pa_sync,
+            pa_interval,
+            subgroups,
+        );
+        self.write_to_bascp(op).await
+    }
+
+    /// Requests the BASS server to add or update Metadata for the Broadcast
+    /// Source, and/or to request the server to synchronize to, or to stop
+    /// synchronization to, a PA and/or a BIS.
+    ///
+    /// # Arguments
+    ///
+    /// * `broadcast_id` - id of the broadcast source to modify
+    /// * `pa_sync` - pa sync mode the scan delegator peer should attempt to be
+    ///   in
+    /// * `pa_interval` - updated PA interval value. If none, unknown value is
+    ///   used
+    /// * `bis_sync` - desired BIG to BIS synchronization information. If empty,
+    ///   it's not updated
+    /// * `metadata_map` - map of updated metadata for BIGs. If a mapping does
+    ///   not exist for a BIG, that BIG's metadata is not updated
+    pub async fn modify_broadcast_source(
+        &mut self,
+        broadcast_id: BroadcastId,
+        pa_sync: PaSync,
+        pa_interval: Option<PaInterval>,
+        bis_sync: Option<BigToBisSync>,
+        metadata_map: Option<HashMap<SubgroupIndex, Vec<Metadata>>>,
+    ) -> Result<(), Error> {
+        let op = {
+            let mut state = self
+                .get_broadcast_source_state(&broadcast_id)
+                .ok_or(Error::UnknownBroadcastSource(broadcast_id))?;
+
+            // Update BIS_Sync param for BIGs if applicable.
+            if let Some(m) = bis_sync {
+                let sync_map = big_to_bis_sync_indices(&m);
+
+                for (big_index, group) in state.subgroups.iter_mut().enumerate() {
+                    if let Some(bis_indices) = sync_map.get(&(big_index as u8)) {
+                        group.bis_sync.set_sync(bis_indices).map_err(|e| Error::Packet(e))?;
+                    }
+                }
+            }
+            // Update metadata for BIGs if applicable.
+            if let Some(mut m) = metadata_map {
+                for (big_index, group) in state.subgroups.iter_mut().enumerate() {
+                    if let Some(metadata) = m.remove(&(big_index as u8)) {
+                        group.metadata = metadata;
+                    }
+                }
+
+                // Left over metadata values are new subgroups that are to be added. New
+                // subgroups can only be added if the subgroup index is
+                // contiguous to existing subgroups.
+                let mut new_big_indices: Vec<&u8> = m.keys().collect();
+                new_big_indices.sort();
+                for big_index in new_big_indices {
+                    if (*big_index as usize) != state.subgroups.len() {
+                        warn!("cannot add new [{big_index}th] subgroup");
+                        break;
+                    }
+                    let new_subgroup = BigSubgroup::new(None).with_metadata(m[big_index].clone());
+                    state.subgroups.push(new_subgroup);
+                }
+            }
+
+            ModifySourceOperation::new(
+                state.source_id,
+                pa_sync,
+                pa_interval.unwrap_or(PaInterval::unknown()),
+                state.subgroups,
+            )
+        };
+        self.write_to_bascp(op).await
+    }
+
+    pub async fn remove_broadcast_source(
+        &mut self,
+        broadcast_id: BroadcastId,
+    ) -> Result<(), Error> {
+        let source_id = self.get_source_id(&broadcast_id)?;
+
+        let op = RemoveSourceOperation::new(source_id);
+        self.write_to_bascp(op).await
+    }
+
     /// Sets the broadcast code for a particular broadcast stream.
     pub async fn set_broadcast_code(
         &mut self,
         broadcast_id: BroadcastId,
         broadcast_code: [u8; 16],
     ) -> Result<(), Error> {
-        let source_id = self.id_tracker.lock().source_id(&broadcast_id).ok_or(Error::Generic(
-            format!("Cannot find Source ID for specified Broadcast ID {broadcast_id:?}"),
-        ))?;
+        let source_id = self.get_source_id(&broadcast_id)?;
 
         let op = SetBroadcastCodeOperation::new(source_id, broadcast_code.clone());
-        let mut buf = vec![0; op.encoded_len()];
-        let _ = op.encode(&mut buf[..]).map_err(|e| Error::Packet(e))?;
-
-        let c = &self.audio_scan_control_point;
-        let _ = self
-            .gatt_client
-            .write_characteristic(c, WriteMode::WithoutResponse, 0, buf.as_slice())
-            .await
-            .map_err(|e| Error::Gatt(e))?;
+        self.write_to_bascp(op).await?;
 
         // Save the broadcast code we sent.
         self.broadcast_codes.insert(source_id, broadcast_code);
         Ok(())
     }
+
+    /// Returns a list of currently known broadcast sources at the time
+    /// this method was called.
+    pub fn known_broadcast_sources(&self) -> Vec<(Handle, BroadcastReceiveState)> {
+        let lock = self.broadcast_sources.lock();
+        let mut brs = Vec::new();
+        for (k, v) in lock.0.iter() {
+            brs.push((*k, v.clone()));
+        }
+        brs
+    }
 }
 
 #[cfg(test)]
@@ -219,9 +415,11 @@
 
     use std::task::Poll;
 
+    use assert_matches::assert_matches;
     use futures::executor::block_on;
     use futures::{pin_mut, FutureExt};
 
+    use bt_common::core::AdvertisingSetId;
     use bt_common::Uuid;
     use bt_gatt::test_utils::*;
     use bt_gatt::types::{
@@ -317,10 +515,11 @@
 
         // Check that all the characteristics have been discovered.
         assert_eq!(client.audio_scan_control_point, AUDIO_SCAN_CONTROL_POINT_HANDLE);
-        let broadcast_receive_states = client.receive_states.lock();
-        assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_1_HANDLE));
-        assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_2_HANDLE));
-        assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_3_HANDLE));
+        let broadcast_sources = client.known_broadcast_sources();
+        assert_eq!(broadcast_sources.len(), 3);
+        assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_1_HANDLE).is_some());
+        assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_2_HANDLE).is_some());
+        assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_3_HANDLE).is_some());
     }
 
     #[test]
@@ -344,6 +543,7 @@
         let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
         let create_result =
             BroadcastAudioScanServiceClient::<FakeTypes>::create(fake_peer_service.clone());
+
         pin_mut!(create_result);
         let polled = create_result.poll_unpin(&mut noop_cx);
         let Poll::Ready(Err(_)) = polled else {
@@ -422,7 +622,7 @@
     }
 
     #[test]
-    fn pump_notifications() {
+    fn start_event_stream() {
         let (mut client, mut fake_peer_service) = setup_client();
         let mut event_stream = client.take_event_stream().expect("stream was created");
 
@@ -452,17 +652,21 @@
 
         // Check that synced and broadcast code required events were sent out.
         let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
-        let recv_fut = event_stream.select_next_some();
-        let event = block_on(recv_fut).expect("should receive event");
-        assert_eq!(event, BroadcastAudioScanServiceEvent::SyncedToPa(BroadcastId::new(0x040302)));
 
         let recv_fut = event_stream.select_next_some();
         let event = block_on(recv_fut).expect("should receive event");
         assert_eq!(
             event,
-            BroadcastAudioScanServiceEvent::BroadcastCodeRequired(BroadcastId::new(0x040302))
+            Event::AddedBroadcastSource(
+                BroadcastId(0x040302),
+                PaSyncState::Synced,
+                EncryptionStatus::BroadcastCodeRequired
+            )
         );
 
+        // Stream should be pending since no more notifications.
+        assert!(event_stream.poll_next_unpin(&mut noop_cx).is_pending());
+
         // Send notification for updating BRS characteristic to indicate it requires
         // sync info. Notification for updating the BRS characteristic value for
         // characteristic with handle 3.
@@ -488,12 +692,15 @@
             ],
         );
 
-        // Check that sync info required event was sent out.
         let recv_fut = event_stream.select_next_some();
         let event = block_on(recv_fut).expect("should receive event");
         assert_eq!(
             event,
-            BroadcastAudioScanServiceEvent::SyncInfoRequested(BroadcastId::new(0x050403))
+            Event::AddedBroadcastSource(
+                BroadcastId(0x050403),
+                PaSyncState::SyncInfoRequest,
+                EncryptionStatus::NotEncrypted
+            )
         );
 
         // Stream should be pending since no more notifications.
@@ -501,44 +708,251 @@
     }
 
     #[test]
+    fn remote_scan_started() {
+        let (mut client, mut fake_peer_service) = setup_client();
+
+        fake_peer_service.expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x01]);
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let op_fut = client.remote_scan_started();
+        pin_mut!(op_fut);
+        let polled = op_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Ok(_)));
+    }
+
+    #[test]
+    fn remote_scan_stopped() {
+        let (mut client, mut fake_peer_service) = setup_client();
+
+        fake_peer_service.expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x00]);
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let op_fut = client.remote_scan_stopped();
+        pin_mut!(op_fut);
+        let polled = op_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Ok(_)));
+    }
+
+    #[test]
+    fn add_broadcast_source() {
+        let (mut client, mut fake_peer_service) = setup_client();
+
+        fake_peer_service.expect_characteristic_value(
+            &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+            vec![
+                0x02, 0x00, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00, 0x01, 0x11, 0x00, 0x00, 0x00, 0xFF,
+                0xFF, 0x00,
+            ],
+        );
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let op_fut = client.add_broadcast_source(
+            BroadcastId(0x11),
+            AddressType::Public,
+            [0x04, 0x10, 0x00, 0x00, 0x00, 0x00],
+            AdvertisingSetId(1),
+            PaSync::DoNotSync,
+            PaInterval::unknown(),
+            vec![],
+        );
+        pin_mut!(op_fut);
+        let polled = op_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Ok(_)));
+    }
+
+    #[test]
+    fn modify_broadcast_source() {
+        let (mut client, mut fake_peer_service) = setup_client();
+
+        // Manually update the broadcast source tracker for testing purposes.
+        // In practice, this would have been updated from BRS value change notification.
+        client.broadcast_sources.lock().update_state(
+            RECEIVE_STATE_1_HANDLE,
+            BroadcastReceiveState::NonEmpty(ReceiveState {
+                source_id: 0x11,
+                source_address_type: AddressType::Public,
+                source_address: [1, 2, 3, 4, 5, 6],
+                source_adv_sid: AdvertisingSetId(1),
+                broadcast_id: BroadcastId(0x11),
+                pa_sync_state: PaSyncState::Synced,
+                big_encryption: EncryptionStatus::BroadcastCodeRequired,
+                subgroups: vec![],
+            }),
+        );
+
+        #[rustfmt::skip]
+        fake_peer_service.expect_characteristic_value(
+            &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+            vec![
+                0x03, 0x11, 0x00,  // opcode, source id, pa sync
+                0xAA, 0xAA, 0x00,  // pa sync, pa interval, num of subgroups
+            ],
+        );
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let op_fut = client.modify_broadcast_source(
+            BroadcastId(0x11),
+            PaSync::DoNotSync,
+            Some(PaInterval(0xAAAA)),
+            None,
+            None,
+        );
+        pin_mut!(op_fut);
+        let polled = op_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Ok(_)));
+    }
+
+    #[test]
+    fn modify_broadcast_source_updates_groups() {
+        let (mut client, mut fake_peer_service) = setup_client();
+
+        // Manually update the broadcast source tracker for testing purposes.
+        // In practice, this would have been updated from BRS value change notification.
+        client.broadcast_sources.lock().update_state(
+            RECEIVE_STATE_1_HANDLE,
+            BroadcastReceiveState::NonEmpty(ReceiveState {
+                source_id: 0x11,
+                source_address_type: AddressType::Public,
+                source_address: [1, 2, 3, 4, 5, 6],
+                source_adv_sid: AdvertisingSetId(1),
+                broadcast_id: BroadcastId(0x11),
+                pa_sync_state: PaSyncState::Synced,
+                big_encryption: EncryptionStatus::BroadcastCodeRequired,
+                subgroups: vec![BigSubgroup::new(None)],
+            }),
+        );
+
+        // Default PA interval value and subgroups value read from the BRS
+        // characteristic are used.
+        #[rustfmt::skip]
+        fake_peer_service.expect_characteristic_value(
+            &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+            vec![
+                0x03, 0x11, 0x00,                    // opcode, source id, pa sync
+                0xFF, 0xFF, 0x02,                    // pa sync, pa interval, num of subgroups
+                0x17, 0x00, 0x00, 0x00,              // bis sync (0th subgroup)
+                0x02, 0x01, 0x09,                    // metadata len, metadata
+                0xFF, 0xFF, 0xFF, 0xFF,              // bis sync (1th subgroup)
+                0x05, 0x04, 0x04, 0x65, 0x6E, 0x67,  // metadata len, metadata
+            ],
+        );
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let op_fut = client.modify_broadcast_source(
+            BroadcastId(0x11),
+            PaSync::DoNotSync,
+            None,
+            Some(HashSet::from([(0, 1), (0, 2), (0, 3), (0, 5)])),
+            Some(HashMap::from([
+                (0, vec![Metadata::BroadcastAudioImmediateRenderingFlag]),
+                (1, vec![Metadata::Language("eng".to_string())]),
+                (5, vec![Metadata::ProgramInfoURI("this subgroup shouldn't be added".to_string())]),
+            ])),
+        );
+        pin_mut!(op_fut);
+        let polled = op_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Ok(_)));
+    }
+
+    #[test]
+    fn modify_broadcast_source_fail() {
+        let (mut client, _fake_peer_service) = setup_client();
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        // Broadcast source wasn't previously added.
+        let op_fut =
+            client.modify_broadcast_source(BroadcastId(0x11), PaSync::DoNotSync, None, None, None);
+        pin_mut!(op_fut);
+        let polled = op_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Err(_)));
+    }
+
+    #[test]
+    fn remove_broadcast_source() {
+        let (mut client, mut fake_peer_service) = setup_client();
+
+        // Manually update the broadcast source tracker for testing purposes.
+        // In practice, this would have been updated from BRS value change notification.
+        client.broadcast_sources.lock().update_state(
+            RECEIVE_STATE_1_HANDLE,
+            BroadcastReceiveState::NonEmpty(ReceiveState {
+                source_id: 0x11,
+                source_address_type: AddressType::Public,
+                source_address: [1, 2, 3, 4, 5, 6],
+                source_adv_sid: AdvertisingSetId(1),
+                broadcast_id: BroadcastId(0x11),
+                pa_sync_state: PaSyncState::Synced,
+                big_encryption: EncryptionStatus::BroadcastCodeRequired,
+                subgroups: vec![],
+            }),
+        );
+
+        fake_peer_service
+            .expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x05, 0x11]);
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        // Broadcast source wasn't previously added.
+        let op_fut = client.remove_broadcast_source(BroadcastId(0x11));
+        pin_mut!(op_fut);
+        let polled = op_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Ok(_)));
+    }
+
+    #[test]
+    fn remove_broadcast_source_fail() {
+        let (mut client, _fake_peer_service) = setup_client();
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        // Broadcast source wasn't previously added.
+        let op_fut = client.remove_broadcast_source(BroadcastId(0x11));
+        pin_mut!(op_fut);
+        let polled = op_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Err(_)));
+    }
+
+    #[test]
     fn set_broadcast_code() {
         let (mut client, mut fake_peer_service) = setup_client();
 
-        // Manually update the internal id tracker for testing purposes.
+        // Manually update the broadcast source tracker for testing purposes.
         // In practice, this would have been updated from BRS value change notification.
-        client.id_tracker.lock().update(0x01, BroadcastId::new(0x030201));
+        client.broadcast_sources.lock().update_state(
+            RECEIVE_STATE_1_HANDLE,
+            BroadcastReceiveState::NonEmpty(ReceiveState {
+                source_id: 0x01,
+                source_address_type: AddressType::Public,
+                source_address: [1, 2, 3, 4, 5, 6],
+                source_adv_sid: AdvertisingSetId(1),
+                broadcast_id: BroadcastId(0x030201),
+                pa_sync_state: PaSyncState::Synced,
+                big_encryption: EncryptionStatus::BroadcastCodeRequired,
+                subgroups: vec![],
+            }),
+        );
 
-        {
-            fake_peer_service.expect_characteristic_value(
-                &AUDIO_SCAN_CONTROL_POINT_HANDLE,
-                vec![0x04, 0x01, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
-            );
+        fake_peer_service.expect_characteristic_value(
+            &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+            vec![0x04, 0x01, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
+        );
 
-            let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
-            let set_code_fut = client.set_broadcast_code(BroadcastId::new(0x030201), [1; 16]);
-            pin_mut!(set_code_fut);
-            let polled = set_code_fut.poll_unpin(&mut noop_cx);
-            let Poll::Ready(Ok(_)) = polled else {
-                panic!("Expected to succeed");
-            };
-        }
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let set_code_fut = client.set_broadcast_code(BroadcastId(0x030201), [1; 16]);
+        pin_mut!(set_code_fut);
+        let polled = set_code_fut.poll_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Ok(_)));
     }
 
     #[test]
     fn set_broadcast_code_fails() {
         let (mut client, _) = setup_client();
 
-        {
-            let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
-            let set_code_fut = client.set_broadcast_code(BroadcastId::new(0x030201), [1; 16]);
-            pin_mut!(set_code_fut);
-            let polled = set_code_fut.poll_unpin(&mut noop_cx);
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let set_code_fut = client.set_broadcast_code(BroadcastId(0x030201), [1; 16]);
+        pin_mut!(set_code_fut);
+        let polled = set_code_fut.poll_unpin(&mut noop_cx);
 
-            // Should fail because we cannot get source id for the broadcast id since BRS
-            // Characteristic value wasn't updated.
-            let Poll::Ready(Err(_)) = polled else {
-                panic!("Expected to fail");
-            };
-        }
+        // Should fail because we cannot get source id for the broadcast id since BRS
+        // Characteristic value wasn't updated.
+        assert_matches!(polled, Poll::Ready(Err(_)));
     }
 }
diff --git a/rust/bt-bass/src/client/error.rs b/rust/bt-bass/src/client/error.rs
index 635a87c..1f6e4bd 100644
--- a/rust/bt-bass/src/client/error.rs
+++ b/rust/bt-bass/src/client/error.rs
@@ -7,6 +7,8 @@
 use bt_common::packet_encoding::Error as PacketError;
 use bt_gatt::types::{Error as BTGattError, Handle};
 
+use crate::types::BroadcastId;
+
 #[derive(Debug, Error)]
 pub enum Error {
     #[error(
@@ -26,6 +28,12 @@
     #[error("GATT operation error: {0}")]
     Gatt(BTGattError),
 
+    #[error("Error while polling BASS client event stream: {0}")]
+    EventStream(Box<Error>),
+
+    #[error("Broadcast source with id ({0:?}) is unknown")]
+    UnknownBroadcastSource(BroadcastId),
+
     #[error("Failure occurred: {0}")]
     Generic(String),
 }
diff --git a/rust/bt-bass/src/client/event.rs b/rust/bt-bass/src/client/event.rs
index b7a5275..2c7ac2f 100644
--- a/rust/bt-bass/src/client/event.rs
+++ b/rust/bt-bass/src/client/event.rs
@@ -2,24 +2,25 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-use std::collections::{HashMap, VecDeque};
+use std::collections::VecDeque;
 use std::sync::Arc;
 use std::task::Poll;
 
-use bt_common::packet_encoding::Decodable;
-use bt_gatt::client::CharacteristicNotification;
-use bt_gatt::types::{Error as BtGattError, Handle};
 use futures::stream::{BoxStream, FusedStream, SelectAll};
 use futures::{Stream, StreamExt};
 use parking_lot::Mutex;
 
+use bt_common::packet_encoding::Decodable;
+use bt_gatt::client::CharacteristicNotification;
+use bt_gatt::types::Error as BtGattError;
+
 use crate::client::error::Error;
 use crate::client::error::ServiceError;
-use crate::client::BroadcastSourceIdTracker;
+use crate::client::KnownBroadcastSources;
 use crate::types::*;
 
 #[derive(Clone, Debug, PartialEq)]
-pub enum BroadcastAudioScanServiceEvent {
+pub enum Event {
     // Broadcast Audio Scan Service (BASS) server requested for SyncInfo through PAST procedure.
     SyncInfoRequested(BroadcastId),
     // BASS server failed to synchornize to PA or did not synchronize to PA.
@@ -32,83 +33,83 @@
     BroadcastCodeRequired(BroadcastId),
     // BASS server failed to decrypt BIS using the previously provided code.
     InvalidBroadcastCode(BroadcastId, [u8; 16]),
+    // BASS server has autonomously synchronized to a BIS that is encrypted, and the server
+    // has the correct encryption key to decrypt the BIS.
+    Decrypting(BroadcastId),
     // Received a packet from the BASS server not recognized by this library.
     UnknownPacket,
+    // Broadcast source was removed by the BASS server.
+    RemovedBroadcastSource(BroadcastId),
+    // Broadcast source was added by the BASS server.
+    AddedBroadcastSource(BroadcastId, PaSyncState, EncryptionStatus),
 }
 
-impl BroadcastAudioScanServiceEvent {
-    pub(crate) fn from_broadcast_receive_state(
-        state: &ReceiveState,
-    ) -> Vec<BroadcastAudioScanServiceEvent> {
+impl Event {
+    pub(crate) fn from_broadcast_receive_state(state: &ReceiveState) -> Vec<Event> {
         let mut events = Vec::new();
         let pa_sync_state = state.pa_sync_state();
         let broadcast_id = state.broadcast_id();
         match pa_sync_state {
-            PaSyncState::SyncInfoRequest => {
-                events.push(BroadcastAudioScanServiceEvent::SyncInfoRequested(broadcast_id))
-            }
-            PaSyncState::Synced => {
-                events.push(BroadcastAudioScanServiceEvent::SyncedToPa(broadcast_id))
-            }
+            PaSyncState::SyncInfoRequest => events.push(Event::SyncInfoRequested(broadcast_id)),
+            PaSyncState::Synced => events.push(Event::SyncedToPa(broadcast_id)),
             PaSyncState::FailedToSync | PaSyncState::NotSynced => {
-                events.push(BroadcastAudioScanServiceEvent::NotSyncedToPa(broadcast_id))
+                events.push(Event::NotSyncedToPa(broadcast_id))
             }
-            PaSyncState::NoPast => {
-                events.push(BroadcastAudioScanServiceEvent::SyncedFailedNoPast(broadcast_id))
-            }
+            PaSyncState::NoPast => events.push(Event::SyncedFailedNoPast(broadcast_id)),
         }
         match state.big_encryption() {
             EncryptionStatus::BroadcastCodeRequired => {
-                events.push(BroadcastAudioScanServiceEvent::BroadcastCodeRequired(broadcast_id))
+                events.push(Event::BroadcastCodeRequired(broadcast_id))
             }
-            EncryptionStatus::BadCode(code) => events.push(
-                BroadcastAudioScanServiceEvent::InvalidBroadcastCode(broadcast_id, code.clone()),
-            ),
+            EncryptionStatus::Decrypting => events.push(Event::Decrypting(broadcast_id)),
+            EncryptionStatus::BadCode(code) => {
+                events.push(Event::InvalidBroadcastCode(broadcast_id, code.clone()))
+            }
             _ => {}
         };
         events
     }
 }
 
-pub struct BroadcastAudioScanServiceEventStream {
-    // Polled to receive BASS notifications.
+/// Trait for representing a stream that outputs Events from BASS. If there was
+/// an error the stream should output error instead and terminate.
+
+pub struct EventStream {
+    // Actual GATT notification streams that we poll from.
     notification_streams:
         SelectAll<BoxStream<'static, Result<CharacteristicNotification, BtGattError>>>,
 
-    event_queue: VecDeque<Result<BroadcastAudioScanServiceEvent, Error>>,
+    event_queue: VecDeque<Result<Event, Error>>,
     terminated: bool,
 
     // States to be updated.
-    id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
-    receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+    broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
 }
 
-impl BroadcastAudioScanServiceEventStream {
+impl EventStream {
     pub(crate) fn new(
         notification_streams: SelectAll<
             BoxStream<'static, Result<CharacteristicNotification, BtGattError>>,
         >,
-        id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
-        receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+        broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
     ) -> Self {
         Self {
             notification_streams,
             event_queue: VecDeque::new(),
             terminated: false,
-            id_tracker,
-            receive_states,
+            broadcast_sources,
         }
     }
 }
 
-impl FusedStream for BroadcastAudioScanServiceEventStream {
+impl FusedStream for EventStream {
     fn is_terminated(&self) -> bool {
         self.terminated
     }
 }
 
-impl Stream for BroadcastAudioScanServiceEventStream {
-    type Item = Result<BroadcastAudioScanServiceEvent, Error>;
+impl Stream for EventStream {
+    type Item = Result<Event, Error>;
 
     fn poll_next(
         mut self: std::pin::Pin<&mut Self>,
@@ -119,85 +120,92 @@
         }
 
         loop {
+            if let Some(item) = self.event_queue.pop_front() {
+                match item {
+                    Ok(event) => return Poll::Ready(Some(Ok(event))),
+                    Err(e) => {
+                        // If an error was received, we terminate the event stream, but send an
+                        // error to indicate why it was terminated.
+                        self.terminated = true;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                }
+            }
+
             match self.notification_streams.poll_next_unpin(cx) {
-                Poll::Pending => {}
+                Poll::Pending => return Poll::Pending,
                 Poll::Ready(None) => {
-                    self.event_queue.push_back(Err(Error::Service(
-                        super::error::ServiceError::NotificationChannelClosed(format!(
+                    let err = Error::EventStream(Box::new(Error::Service(
+                        ServiceError::NotificationChannelClosed(format!(
                             "GATT notification stream for BRS characteristics closed"
                         )),
                     )));
+                    self.event_queue.push_back(Err(err));
                 }
-                Poll::Ready(Some(received)) => {
-                    match received {
-                        Err(error) => match error {
-                            BtGattError::PeerNotRecognized(_)
-                            | BtGattError::ScanFailed(_)
-                            | BtGattError::Other(_) => {
-                                self.event_queue.push_back(Err(Error::Service(
-                                    ServiceError::NotificationChannelClosed(format!(
-                                        "unexpected error encountered from GATT notification"
-                                    )),
-                                )));
-                            }
-                            BtGattError::PeerDisconnected(id) => {
-                                self.event_queue.push_back(Err(Error::Service(
-                                    ServiceError::NotificationChannelClosed(format!(
-                                        "peer ({id}) disconnected"
-                                    )),
-                                )));
-                            }
-                            _ => {} // TODO(b/308483171): decide what to do for non-critical errors.
-                        },
-                        Ok(notification) => {
-                            let Ok((brs, _)) =
-                                BroadcastReceiveState::decode(notification.value.as_slice())
-                            else {
-                                self.event_queue
-                                    .push_back(Ok(BroadcastAudioScanServiceEvent::UnknownPacket));
-                                break;
-                            };
-                            match &brs {
-                                BroadcastReceiveState::Empty => {}
-                                BroadcastReceiveState::NonEmpty(state) => {
-                                    let events = BroadcastAudioScanServiceEvent::from_broadcast_receive_state(state);
-                                    events
-                                        .into_iter()
-                                        .for_each(|e| self.event_queue.push_back(Ok(e)));
+                Poll::Ready(Some(Err(error))) => {
+                    // Deem all errors as critical.
+                    let err = Error::EventStream(Box::new(Error::Gatt(error)));
+                    self.event_queue.push_back(Err(err));
+                }
+                Poll::Ready(Some(Ok(notification))) => {
+                    let char_handle = notification.handle;
+                    let Ok((new_state, _)) =
+                        BroadcastReceiveState::decode(notification.value.as_slice())
+                    else {
+                        self.event_queue.push_back(Ok(Event::UnknownPacket));
+                        continue;
+                    };
 
-                                    // Update broadcast ID to source ID mapping.
-                                    let _ = self
-                                        .id_tracker
-                                        .lock()
-                                        .update(state.source_id(), state.broadcast_id());
-                                }
-                            };
+                    let maybe_prev_state = {
+                        let mut lock = self.broadcast_sources.lock();
+                        lock.update_state(char_handle, new_state.clone())
+                    };
+
+                    let mut multi_events = VecDeque::new();
+
+                    // If the previous value was not empty, check if it was overwritten.
+                    if let Some(ref prev_state) = maybe_prev_state {
+                        if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
+                            if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state)
                             {
-                                // Update the Broadcast Receive States.
-                                let mut lock = self.receive_states.lock();
-                                let char = lock.get_mut(&notification.handle).unwrap();
-                                char.replace(brs);
+                                multi_events.push_back(Ok(Event::RemovedBroadcastSource(
+                                    prev_receive_state.broadcast_id,
+                                )));
                             }
                         }
                     }
+
+                    // BRS characteristic value was updated with a new broadcast source
+                    // information.
+                    if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
+                        let is_new_source = match maybe_prev_state {
+                            Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
+                            None => true,
+                        };
+                        if is_new_source {
+                            multi_events.push_back(Ok(Event::AddedBroadcastSource(
+                                receive_state.broadcast_id,
+                                receive_state.pa_sync_state,
+                                receive_state.big_encryption,
+                            )));
+                        } else {
+                            let other_events = Event::from_broadcast_receive_state(receive_state);
+                            for e in other_events.into_iter() {
+                                multi_events.push_back(Ok(e));
+                            }
+                        }
+                    }
+                    if multi_events.len() != 0 {
+                        self.event_queue.append(&mut multi_events);
+                        continue;
+                    }
+                    continue;
                 }
             };
+
             break;
         }
-
-        let popped = self.event_queue.pop_front();
-        match popped {
-            None => Poll::Pending,
-            Some(item) => match item {
-                Ok(event) => Poll::Ready(Some(Ok(event))),
-                Err(e) => {
-                    // If an error was received, we terminate the event stream, but send an error to
-                    // indicate why it was terminated.
-                    self.terminated = true;
-                    Poll::Ready(Some(Err(e)))
-                }
-            },
-        }
+        Poll::Pending
     }
 }
 
@@ -205,21 +213,26 @@
 mod tests {
     use super::*;
 
+    use std::collections::HashMap;
+
+    use assert_matches::assert_matches;
     use futures::channel::mpsc::unbounded;
 
+    use bt_gatt::types::Handle;
+
     #[test]
-    fn poll_broadcast_audio_scan_service_event_stream() {
+    fn poll_event_stream() {
         let mut streams = SelectAll::new();
         let (sender1, receiver1) = unbounded();
         let (sender2, receiver2) = unbounded();
         streams.push(receiver1.boxed());
         streams.push(receiver2.boxed());
 
-        let id_tracker = Arc::new(Mutex::new(BroadcastSourceIdTracker::new()));
-        let receive_states =
-            Arc::new(Mutex::new(HashMap::from([(Handle(0x1), None), (Handle(0x2), None)])));
-        let mut event_streams =
-            BroadcastAudioScanServiceEventStream::new(streams, id_tracker, receive_states);
+        let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
+            (Handle(0x1), BroadcastReceiveState::Empty),
+            (Handle(0x2), BroadcastReceiveState::Empty),
+        ]))));
+        let mut event_streams = EventStream::new(streams, source_tracker);
 
         // Send notifications to underlying streams.
         let bad_code_status =
@@ -259,30 +272,16 @@
 
         // Events should have been generated from notifications.
         let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
-        match event_streams.poll_next_unpin(&mut noop_cx) {
-            Poll::Ready(Some(Ok(event))) => assert_eq!(
-                event,
-                BroadcastAudioScanServiceEvent::NotSyncedToPa(BroadcastId::new(0x030201))
-            ),
-            _ => panic!("should have received event"),
-        }
-        match event_streams.poll_next_unpin(&mut noop_cx) {
-            Poll::Ready(Some(Ok(event))) => assert_eq!(
-                event,
-                BroadcastAudioScanServiceEvent::InvalidBroadcastCode(
-                    BroadcastId::new(0x030201),
-                    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
-                )
-            ),
-            _ => panic!("should have received event"),
-        }
-        match event_streams.poll_next_unpin(&mut noop_cx) {
-            Poll::Ready(Some(Ok(event))) => assert_eq!(
-                event,
-                BroadcastAudioScanServiceEvent::SyncedFailedNoPast(BroadcastId::new(0x040302))
-            ),
-            _ => panic!("should have received event"),
-        }
+
+        let polled = event_streams.poll_next_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
+            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId(0x030201), PaSyncState::FailedToSync, EncryptionStatus::BadCode([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])));
+        });
+
+        let polled = event_streams.poll_next_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
+            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId(0x040302), PaSyncState::NoPast, EncryptionStatus::NotEncrypted));
+        });
 
         // Should be pending because no more events generated from notifications.
         assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
@@ -306,13 +305,67 @@
 
         // Event should have been generated from notification.
         let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
-        match event_streams.poll_next_unpin(&mut noop_cx) {
-            Poll::Ready(Some(Ok(event))) => assert_eq!(
-                event,
-                BroadcastAudioScanServiceEvent::SyncedToPa(BroadcastId::new(0x040302))
-            ),
-            _ => panic!("should have received event"),
-        }
+        assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::SyncedToPa(BroadcastId(0x040302))) });
+
+        // Should be pending because no more events generated from notifications.
+        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
+    }
+
+    #[test]
+    fn broadcast_source_is_removed() {
+        let mut streams = SelectAll::new();
+        let (_sender1, receiver1) = unbounded();
+        let (sender2, receiver2) = unbounded();
+        streams.push(receiver1.boxed());
+        streams.push(receiver2.boxed());
+
+        let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
+            (Handle(0x1), BroadcastReceiveState::Empty),
+            (Handle(0x2), BroadcastReceiveState::Empty),
+        ]))));
+        let mut event_streams = EventStream::new(streams, source_tracker);
+
+        // Send notifications to underlying streams.
+        #[rustfmt::skip]
+        sender2
+            .unbounded_send(Ok(CharacteristicNotification {
+                handle: Handle(0x2),
+                value: vec![
+                    0x02, AddressType::Public as u8,             // source id and address type
+                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
+                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
+                    PaSyncState::Synced as u8,
+                    EncryptionStatus::NotEncrypted.raw_value(),
+                    0x00,                                        // no subgroups
+                ],
+                maybe_truncated: false,
+            }))
+            .expect("should send");
+
+        // Events should have been generated from notifications.
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+        let polled = event_streams.poll_next_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
+            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId(0x040302), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
+        });
+
+        // Should be pending because no more events generated from notifications.
+        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
+
+        // Send notifications to underlying streams. Ths time, send empty BRS
+        // characteristic value.
+        sender2
+            .unbounded_send(Ok(CharacteristicNotification {
+                handle: Handle(0x2),
+                value: vec![],
+                maybe_truncated: false,
+            }))
+            .expect("should send");
+
+        // Event should have been generated from notification.
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::RemovedBroadcastSource(BroadcastId(0x040302))) });
 
         // Should be pending because no more events generated from notifications.
         assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
diff --git a/rust/bt-bass/src/lib.rs b/rust/bt-bass/src/lib.rs
index 0d619cd..1f8980b 100644
--- a/rust/bt-bass/src/lib.rs
+++ b/rust/bt-bass/src/lib.rs
@@ -4,3 +4,10 @@
 
 pub mod client;
 pub mod types;
+pub use crate::client::error::Error;
+
+#[cfg(any(test, feature = "test-utils"))]
+pub mod test_utils;
+
+#[cfg(test)]
+pub mod tests;
diff --git a/rust/bt-bass/src/test_utils.rs b/rust/bt-bass/src/test_utils.rs
new file mode 100644
index 0000000..459a488
--- /dev/null
+++ b/rust/bt-bass/src/test_utils.rs
@@ -0,0 +1,47 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::task::Poll;
+
+use futures::channel::mpsc::UnboundedReceiver;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::client::error::Error;
+use crate::client::event::*;
+
+pub struct FakeBassEventStream {
+    receiver: UnboundedReceiver<Result<Event, Error>>,
+    terminated: bool,
+}
+
+impl FakeBassEventStream {
+    pub fn new(receiver: UnboundedReceiver<Result<Event, Error>>) -> Self {
+        Self { receiver, terminated: false }
+    }
+}
+
+impl FusedStream for FakeBassEventStream {
+    fn is_terminated(&self) -> bool {
+        self.terminated
+    }
+}
+
+impl Stream for FakeBassEventStream {
+    type Item = Result<Event, Error>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.terminated {
+            return Poll::Ready(None);
+        }
+        let polled = self.receiver.poll_next_unpin(cx);
+        if let Poll::Ready(Some(Err(_))) = &polled {
+            self.terminated = true;
+        }
+        polled
+    }
+}
diff --git a/rust/bt-bass/src/tests.rs b/rust/bt-bass/src/tests.rs
new file mode 100644
index 0000000..64d9107
--- /dev/null
+++ b/rust/bt-bass/src/tests.rs
@@ -0,0 +1,57 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::task::Poll;
+
+use assert_matches::assert_matches;
+use futures::channel::mpsc::unbounded;
+use futures::stream::StreamExt;
+
+use crate::client::error::Error;
+use crate::client::event::*;
+use crate::test_utils::*;
+use crate::types::BroadcastId;
+
+#[test]
+fn fake_bass_event_stream() {
+    let (sender, receiver) = unbounded();
+    let mut stream = FakeBassEventStream::new(receiver);
+
+    let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+    assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+    // Add events.
+    sender.unbounded_send(Ok(Event::SyncedToPa(BroadcastId(1)))).expect("should succeed");
+    sender
+        .unbounded_send(Ok(Event::BroadcastCodeRequired(BroadcastId(1))))
+        .expect("should succeed");
+
+    let polled = stream.poll_next_unpin(&mut noop_cx);
+    let Poll::Ready(Some(Ok(event))) = polled else {
+        panic!("Expected to receive event");
+    };
+    assert_matches!(event, Event::SyncedToPa(_));
+
+    let polled = stream.poll_next_unpin(&mut noop_cx);
+    let Poll::Ready(Some(Ok(event))) = polled else {
+        panic!("Expected to receive event");
+    };
+    assert_matches!(event, Event::BroadcastCodeRequired(_));
+
+    assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+    sender
+        .unbounded_send(Err(Error::EventStream(Box::new(Error::Generic(format!("some error"))))))
+        .expect("should succeed");
+
+    let polled = stream.poll_next_unpin(&mut noop_cx);
+    let Poll::Ready(Some(Err(_))) = polled else {
+        panic!("Expected to receive error");
+    };
+
+    let polled = stream.poll_next_unpin(&mut noop_cx);
+    let Poll::Ready(None) = polled else {
+        panic!("Expected to have terminated");
+    };
+}
diff --git a/rust/bt-bass/src/types.rs b/rust/bt-bass/src/types.rs
index efb5e60..f810a3d 100644
--- a/rust/bt-bass/src/types.rs
+++ b/rust/bt-bass/src/types.rs
@@ -6,9 +6,9 @@
 use bt_common::core::{AdvertisingSetId, PaInterval};
 use bt_common::generic_audio::metadata_ltv::*;
 use bt_common::packet_encoding::{Decodable, Encodable, Error as PacketError};
-use bt_common::Uuid;
+use bt_common::{decodable_enum, Uuid};
 
-const ADDRESS_BYTE_SIZE: usize = 6;
+pub const ADDRESS_BYTE_SIZE: usize = 6;
 const NUM_SUBGROUPS_BYTE_SIZE: usize = 1;
 const PA_SYNC_BYTE_SIZE: usize = 1;
 const SOURCE_ID_BYTE_SIZE: usize = 1;
@@ -20,16 +20,18 @@
 
 pub type SourceId = u8;
 
+/// BIS index value of a particular BIS. Valid value range is [1 to len of BIS]
+pub type BisIndex = u8;
+
 /// Broadcast_ID is a 3-byte data on the wire.
 /// See Broadcast Audio Scan Service (BASS) spec v1.0 Table 3.5 for details.
 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
-pub struct BroadcastId(u32);
+pub struct BroadcastId(pub u32);
 
 impl BroadcastId {
     // On the wire, Broadcast_ID is transported in 3 bytes.
-    const BYTE_SIZE: usize = 3;
+    pub const BYTE_SIZE: usize = 3;
 
-    #[cfg(test)]
     pub fn new(raw_value: u32) -> Self {
         Self(raw_value)
     }
@@ -39,6 +41,12 @@
     }
 }
 
+impl std::fmt::Display for BroadcastId {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{:#08x}", self.0)
+    }
+}
+
 impl TryFrom<u32> for BroadcastId {
     type Error = PacketError;
 
@@ -84,46 +92,30 @@
     }
 }
 
+decodable_enum! {
+    pub enum ControlPointOpcode<u8, bt_common::packet_encoding::Error, OutOfRange> {
+        RemoteScanStopped = 0x00,
+        RemoteScanStarted = 0x01,
+        AddSource = 0x02,
+        ModifySource = 0x03,
+        SetBroadcastCode = 0x04,
+        RemoveSource = 0x05,
+    }
+}
+
 /// Broadcast Audio Scan Control Point characteristic opcode as defined in
 /// Broadcast Audio Scan Service spec v1.0 Section 3.1.
-#[derive(Debug, PartialEq)]
-pub enum ControlPointOpcode {
-    RemoteScanStopped = 0x00,
-    RemoteScanStarted = 0x01,
-    AddSource = 0x02,
-    ModifySource = 0x03,
-    SetBroadcastCode = 0x04,
-    RemoveSource = 0x05,
-}
-
 impl ControlPointOpcode {
     const BYTE_SIZE: usize = 1;
 }
 
-impl TryFrom<u8> for ControlPointOpcode {
-    type Error = PacketError;
-
-    fn try_from(value: u8) -> Result<Self, Self::Error> {
-        let val = match value {
-            0x00 => Self::RemoteScanStopped,
-            0x01 => Self::RemoteScanStarted,
-            0x02 => Self::AddSource,
-            0x03 => Self::ModifySource,
-            0x04 => Self::SetBroadcastCode,
-            0x05 => Self::RemoveSource,
-            _ => return Err(PacketError::OutOfRange),
-        };
-        Ok(val)
-    }
-}
-
 /// Trait for objects that represent a Broadcast Audio Scan Control Point
 /// characteristic. When written by a client, the Broadcast Audio Scan Control
 /// Point characteristic is defined as an 8-bit enumerated value, known as the
 /// opcode, followed by zero or more parameter octets. The opcode represents the
 /// operation that would be performed in the Broadcast Audio Scan Service
 /// server. See BASS spec v1.0 Section 3.1 for details.
-pub trait ControlPointOperation {
+pub trait ControlPointOperation: Encodable<Error = PacketError> {
     // Returns the expected opcode for this operation.
     fn opcode() -> ControlPointOpcode;
 
@@ -220,14 +212,14 @@
 /// See BASS spec v1.0 Section 3.1.1.4 for details.
 #[derive(Debug, PartialEq)]
 pub struct AddSourceOperation {
-    advertiser_address_type: AddressType,
+    pub(crate) advertiser_address_type: AddressType,
     // Address in little endian.
-    advertiser_address: [u8; ADDRESS_BYTE_SIZE],
-    advertising_sid: AdvertisingSetId,
-    broadcast_id: BroadcastId,
-    pa_sync: PaSync,
-    pa_interval: PaInterval,
-    subgroups: Vec<BigSubgroup>,
+    pub(crate) advertiser_address: [u8; ADDRESS_BYTE_SIZE],
+    pub(crate) advertising_sid: AdvertisingSetId,
+    pub(crate) broadcast_id: BroadcastId,
+    pub(crate) pa_sync: PaSync,
+    pub(crate) pa_interval: PaInterval,
+    pub(crate) subgroups: Vec<BigSubgroup>,
 }
 
 impl AddSourceOperation {
@@ -243,19 +235,19 @@
     pub fn new(
         address_type: AddressType,
         advertiser_address: [u8; ADDRESS_BYTE_SIZE],
-        sid: u8,
-        broadcast_id: u32,
+        advertising_sid: AdvertisingSetId,
+        broadcast_id: BroadcastId,
         pa_sync: PaSync,
-        pa_interval: u16,
+        pa_interval: PaInterval,
         subgroups: Vec<BigSubgroup>,
     ) -> Self {
         AddSourceOperation {
             advertiser_address_type: address_type,
             advertiser_address,
-            advertising_sid: AdvertisingSetId(sid),
-            broadcast_id: BroadcastId(broadcast_id),
+            advertising_sid,
+            broadcast_id: broadcast_id,
             pa_sync,
-            pa_interval: PaInterval(pa_interval),
+            pa_interval,
             subgroups,
         }
     }
@@ -323,7 +315,7 @@
         buf[2..8].copy_from_slice(&self.advertiser_address);
         buf[8] = self.advertising_sid.0;
         self.broadcast_id.encode(&mut buf[9..12])?;
-        buf[12] = self.pa_sync as u8;
+        buf[12] = u8::from(self.pa_sync);
         buf[13..15].copy_from_slice(&self.pa_interval.0.to_le_bytes());
         buf[15] = self
             .subgroups
@@ -362,15 +354,10 @@
     pub fn new(
         source_id: SourceId,
         pa_sync: PaSync,
-        pa_interval: u16,
+        pa_interval: PaInterval,
         subgroups: Vec<BigSubgroup>,
     ) -> Self {
-        ModifySourceOperation {
-            source_id,
-            pa_sync,
-            pa_interval: PaInterval(pa_interval),
-            subgroups,
-        }
+        ModifySourceOperation { source_id, pa_sync, pa_interval, subgroups }
     }
 }
 
@@ -418,7 +405,7 @@
 
         buf[0] = Self::opcode() as u8;
         buf[1] = self.source_id;
-        buf[2] = self.pa_sync as u8;
+        buf[2] = u8::from(self.pa_sync);
         buf[3..5].copy_from_slice(&self.pa_interval.0.to_le_bytes());
         buf[5] = self
             .subgroups
@@ -546,54 +533,89 @@
     }
 }
 
-#[repr(u8)]
-#[derive(Clone, Copy, Debug, PartialEq)]
-pub enum PaSync {
-    DoNotSync = 0x00,
-    SyncPastAvailable = 0x01,
-    SyncPastUnavailable = 0x02,
-}
-
-impl TryFrom<u8> for PaSync {
-    type Error = PacketError;
-
-    fn try_from(value: u8) -> Result<Self, Self::Error> {
-        match value {
-            0x00 => Ok(Self::DoNotSync),
-            0x01 => Ok(Self::SyncPastAvailable),
-            0x02 => Ok(Self::SyncPastUnavailable),
-            _ => Err(PacketError::OutOfRange),
-        }
+decodable_enum! {
+    pub enum PaSync<u8, bt_common::packet_encoding::Error, OutOfRange> {
+        DoNotSync = 0x00,
+        SyncPastAvailable = 0x01,
+        SyncPastUnavailable = 0x02,
     }
 }
 
-#[derive(Debug, PartialEq)]
+/// 4-octet bitfield. Bit 0-30 = BIS_index[1-31]
+/// 0x00000000: 0b0 = Do not synchronize to BIS_index[x]
+/// 0xxxxxxxxx: 0b1 = Synchronize to BIS_index[x]
+/// 0xFFFFFFFF: means No preference if used in BroadcastAudioScanControlPoint,
+///             Failed to sync if used in ReceiveState.
+#[derive(Clone, Debug, PartialEq)]
+pub struct BisSync(pub u32);
+
+impl BisSync {
+    const BYTE_SIZE: usize = 4;
+    const NO_PREFERENCE: u32 = 0xFFFFFFFF;
+
+    /// Updates whether or not a particular BIS should be set to synchronize or
+    /// not synchronize.
+    ///
+    /// # Arguments
+    ///
+    /// * `bis_index` - BIS index as defined in the spec. Range should be [1,
+    ///   31]
+    /// * `should_sync` - Whether or not to synchronize
+    fn set_sync_for_index(&mut self, bis_index: BisIndex) -> Result<(), PacketError> {
+        if bis_index < 1 || bis_index > 31 {
+            return Err(PacketError::InvalidParameter(format!("Invalid BIS index ({bis_index})")));
+        }
+        let bit_mask = 0b1 << (bis_index - 1);
+
+        // Clear the bit that we're interested in setting.
+        self.0 &= !(0b1 << bis_index - 1);
+        self.0 |= bit_mask;
+        Ok(())
+    }
+
+    /// Clears previous BIS_Sync params and synchronizes to specified BIS
+    /// indices. If the BIS index list is empty, no preference value is
+    /// used.
+    ///
+    /// # Arguments
+    ///
+    /// * `sync_map` - Map of BIS index to whether or not it should be
+    ///   synchronized
+    pub fn set_sync(&mut self, bis_indices: &Vec<BisIndex>) -> Result<(), PacketError> {
+        if bis_indices.is_empty() {
+            self.0 = Self::NO_PREFERENCE;
+            return Ok(());
+        }
+        self.0 = 0;
+        for bis_index in bis_indices {
+            self.set_sync_for_index(*bis_index)?;
+        }
+        Ok(())
+    }
+}
+
+impl Default for BisSync {
+    fn default() -> Self {
+        Self(Self::NO_PREFERENCE)
+    }
+}
+
+#[derive(Clone, Debug, PartialEq)]
 pub struct BigSubgroup {
-    // 4-octet bitfield. Bit 0-30 = BIS_index[1-31]
-    // 0x00000000: 0b0 = Do not synchronize to BIS_index[x]
-    // 0xxxxxxxxx: 0b1 = Synchronize to BIS_index[x]
-    // 0xFFFFFFFF: means No preference if used in BroadcastAudioScanControlPoint,
-    //             Failed to sync if used in ReceiveState.
-    bis_sync_bitfield: u32,
-    metadata: Vec<Metadata>,
+    pub(crate) bis_sync: BisSync,
+    pub(crate) metadata: Vec<Metadata>,
 }
 
 impl BigSubgroup {
-    const BIS_SYNC_BYTE_SIZE: usize = 4;
     const METADATA_LENGTH_BYTE_SIZE: usize = 1;
+    const MIN_PACKET_SIZE: usize = BisSync::BYTE_SIZE + Self::METADATA_LENGTH_BYTE_SIZE;
 
-    const MIN_PACKET_SIZE: usize = Self::BIS_SYNC_BYTE_SIZE + Self::METADATA_LENGTH_BYTE_SIZE;
-    const BIS_SYNC_NO_PREFERENCE: u32 = 0xFFFFFFFF;
-
-    pub fn new(bis_sync_bitfield: Option<u32>) -> Self {
-        Self {
-            bis_sync_bitfield: bis_sync_bitfield.unwrap_or(Self::BIS_SYNC_NO_PREFERENCE),
-            metadata: vec![],
-        }
+    pub fn new(bis_sync: Option<BisSync>) -> Self {
+        Self { bis_sync: bis_sync.unwrap_or_default(), metadata: vec![] }
     }
 
     pub fn with_metadata(self, metadata: Vec<Metadata>) -> Self {
-        Self { bis_sync_bitfield: self.bis_sync_bitfield, metadata }
+        Self { bis_sync: self.bis_sync, metadata }
     }
 }
 
@@ -620,7 +642,7 @@
         }
         // Ignore any undecodable metadata types
         let metadata = results_metadata.into_iter().filter_map(Result::ok).collect();
-        Ok((BigSubgroup { bis_sync_bitfield: bis_sync, metadata }, start_idx))
+        Ok((BigSubgroup { bis_sync: BisSync(bis_sync), metadata }, start_idx))
     }
 }
 
@@ -632,7 +654,7 @@
             return Err(PacketError::BufferTooSmall);
         }
 
-        buf[0..4].copy_from_slice(&self.bis_sync_bitfield.to_le_bytes());
+        buf[0..4].copy_from_slice(&self.bis_sync.0.to_le_bytes());
         let metadata_len = self
             .metadata
             .iter()
@@ -686,12 +708,35 @@
 /// information about a Broadcast Source. If the server has not written a
 /// Source_ID value to the Broadcast Receive State characteristic, the Broadcast
 /// Recieve State characteristic value shall be empty.
-#[derive(Debug, PartialEq)]
+#[derive(Clone, Debug, PartialEq)]
 pub enum BroadcastReceiveState {
     Empty,
     NonEmpty(ReceiveState),
 }
 
+impl BroadcastReceiveState {
+    pub fn is_empty(&self) -> bool {
+        *self == BroadcastReceiveState::Empty
+    }
+
+    pub fn broadcast_id(&self) -> Option<BroadcastId> {
+        match self {
+            BroadcastReceiveState::Empty => None,
+            BroadcastReceiveState::NonEmpty(state) => Some(state.broadcast_id),
+        }
+    }
+
+    pub fn has_same_broadcast_id(&self, other: &BroadcastReceiveState) -> bool {
+        match self {
+            BroadcastReceiveState::Empty => false,
+            BroadcastReceiveState::NonEmpty(this) => match other {
+                BroadcastReceiveState::Empty => false,
+                BroadcastReceiveState::NonEmpty(that) => this.broadcast_id == that.broadcast_id,
+            },
+        }
+    }
+}
+
 impl Decodable for BroadcastReceiveState {
     type Error = PacketError;
 
@@ -722,18 +767,18 @@
     }
 }
 
-#[derive(Debug, PartialEq)]
+#[derive(Clone, Debug, PartialEq)]
 pub struct ReceiveState {
-    source_id: SourceId,
-    source_address_type: AddressType,
+    pub(crate) source_id: SourceId,
+    pub(crate) source_address_type: AddressType,
     // Address in little endian.
-    source_address: [u8; ADDRESS_BYTE_SIZE],
-    source_adv_sid: AdvertisingSetId,
-    broadcast_id: BroadcastId,
-    pa_sync_state: PaSyncState,
+    pub(crate) source_address: [u8; ADDRESS_BYTE_SIZE],
+    pub(crate) source_adv_sid: AdvertisingSetId,
+    pub(crate) broadcast_id: BroadcastId,
+    pub(crate) pa_sync_state: PaSyncState,
     // Represents BIG_Encryption param with optional Bad_Code param.
-    big_encryption: EncryptionStatus,
-    subgroups: Vec<BigSubgroup>,
+    pub(crate) big_encryption: EncryptionStatus,
+    pub(crate) subgroups: Vec<BigSubgroup>,
 }
 
 impl ReceiveState {
@@ -848,7 +893,7 @@
         buf[2..8].copy_from_slice(&self.source_address);
         buf[8] = self.source_adv_sid.0;
         self.broadcast_id.encode(&mut buf[9..12])?;
-        buf[12] = self.pa_sync_state as u8;
+        buf[12] = u8::from(self.pa_sync_state);
         let mut idx = 13 + self.big_encryption.encoded_len();
         self.big_encryption.encode(&mut buf[13..idx])?;
         buf[idx] = self
@@ -880,28 +925,13 @@
     }
 }
 
-#[derive(Clone, Copy, Debug, PartialEq)]
-#[repr(u8)]
-pub enum PaSyncState {
-    NotSynced = 0x00,
-    SyncInfoRequest = 0x01,
-    Synced = 0x02,
-    FailedToSync = 0x03,
-    NoPast = 0x04,
-}
-
-impl TryFrom<u8> for PaSyncState {
-    type Error = PacketError;
-
-    fn try_from(value: u8) -> Result<Self, Self::Error> {
-        match value {
-            0x00 => Ok(Self::NotSynced),
-            0x01 => Ok(Self::SyncInfoRequest),
-            0x02 => Ok(Self::Synced),
-            0x03 => Ok(Self::FailedToSync),
-            0x04 => Ok(Self::NoPast),
-            _ => Err(PacketError::OutOfRange),
-        }
+decodable_enum! {
+    pub enum PaSyncState<u8, bt_common::packet_encoding::Error, OutOfRange> {
+        NotSynced = 0x00,
+        SyncInfoRequest = 0x01,
+        Synced = 0x02,
+        FailedToSync = 0x03,
+        NoPast = 0x04,
     }
 }
 
@@ -1082,6 +1112,27 @@
     }
 
     #[test]
+    fn bis_sync() {
+        let mut bis_sync = BisSync::default();
+        assert_eq!(bis_sync, BisSync(BisSync::NO_PREFERENCE));
+
+        bis_sync.set_sync(&vec![1, 6, 31]).expect("should succeed");
+        assert_eq!(bis_sync, BisSync(0x40000021));
+
+        bis_sync.set_sync(&vec![]).expect("should succeed");
+        assert_eq!(bis_sync, BisSync::default());
+    }
+
+    #[test]
+    fn invalid_bis_sync() {
+        let mut bis_sync = BisSync::default();
+
+        bis_sync.set_sync(&vec![0]).expect_err("should fail");
+
+        bis_sync.set_sync(&vec![32]).expect_err("should fail");
+    }
+
+    #[test]
     fn remote_scan_stopped() {
         // Encoding remote scan stopped.
         let stopped = RemoteScanStoppedOperation;
@@ -1121,10 +1172,10 @@
         let op = AddSourceOperation::new(
             AddressType::Public,
             [0x04, 0x10, 0x00, 0x00, 0x00, 0x00],
-            1,
-            0x11,
+            AdvertisingSetId(1),
+            BroadcastId(0x11),
             PaSync::DoNotSync,
-            PaInterval::UNKNOWN_VALUE,
+            PaInterval::unknown(),
             vec![],
         );
         assert_eq!(op.encoded_len(), 16);
@@ -1153,10 +1204,10 @@
         let op = AddSourceOperation::new(
             AddressType::Random,
             [0x04, 0x10, 0x00, 0x00, 0x00, 0x00],
-            1,
-            0x11,
+            AdvertisingSetId(1),
+            BroadcastId(0x11),
             PaSync::SyncPastAvailable,
-            PaInterval::UNKNOWN_VALUE,
+            PaInterval::unknown(),
             subgroups,
         );
         assert_eq!(op.encoded_len(), 31); // 16 for minimum params and params 15 for the subgroup.
@@ -1166,8 +1217,8 @@
         let bytes = vec![
             0x02, 0x01, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00, 0x01, 0x11, 0x00, 0x00, 0x01, 0xFF,
             0xFF, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x0A, // BIS_Sync, Metdata_Length
-            0x03, 0x01, 0x0C, 0x00, // Preferred_Audio_Contexts metdataum
-            0x05, 0x03, 0x74, 0x65, 0x73, 0x074, // Program_Info metadatum
+            0x03, 0x01, 0x0C, 0x00, // Preferred_Audio_Contexts metadata
+            0x05, 0x03, 0x74, 0x65, 0x73, 0x074, // Program_Info metadata
         ];
         assert_eq!(buf, bytes);
 
@@ -1180,7 +1231,8 @@
     #[test]
     fn modify_source_without_subgroups() {
         // Encoding operation with no subgroups.
-        let op = ModifySourceOperation::new(0x0A, PaSync::SyncPastAvailable, 0x1004, vec![]);
+        let op =
+            ModifySourceOperation::new(0x0A, PaSync::SyncPastAvailable, PaInterval(0x1004), vec![]);
         assert_eq!(op.encoded_len(), 6);
         let mut buf = vec![0u8; op.encoded_len()];
         op.encode(&mut buf[..]).expect("shoud succeed");
@@ -1199,15 +1251,11 @@
         // Encoding operation with subgroups.
         let subgroups = vec![
             BigSubgroup::new(None).with_metadata(vec![Metadata::ParentalRating(Rating::all_age())]), /* encoded_len = 8 */
-            BigSubgroup::new(Some(0x000000FE))
+            BigSubgroup::new(Some(BisSync(0x000000FE)))
                 .with_metadata(vec![Metadata::BroadcastAudioImmediateRenderingFlag]), /* encoded_len = 7 */
         ];
-        let op = ModifySourceOperation::new(
-            0x0B,
-            PaSync::DoNotSync,
-            PaInterval::UNKNOWN_VALUE,
-            subgroups,
-        );
+        let op =
+            ModifySourceOperation::new(0x0B, PaSync::DoNotSync, PaInterval::unknown(), subgroups);
         assert_eq!(op.encoded_len(), 21); // 6 for minimum params and params 15 for two subgroups.
         let mut buf = vec![0u8; op.encoded_len()];
         op.encode(&mut buf[..]).expect("shoud succeed");
diff --git a/rust/bt-common/src/core.rs b/rust/bt-common/src/core.rs
index c7efbca..fd02299 100644
--- a/rust/bt-common/src/core.rs
+++ b/rust/bt-common/src/core.rs
@@ -116,8 +116,44 @@
     }
 }
 
+/// Codec_ID communicated by a basic audio profile service/role.
+#[derive(Debug, Clone, PartialEq)]
+pub enum CodecId {
+    /// From the Assigned Numbers. Format will not be
+    /// `CodingFormat::VendorSpecific`
+    Assigned(CodingFormat),
+    VendorSpecific {
+        company_id: crate::CompanyId,
+        vendor_specific_codec_id: u16,
+    },
+}
+
+impl CodecId {
+    pub const BYTE_SIZE: usize = 5;
+}
+
+impl crate::packet_encoding::Decodable for CodecId {
+    type Error = crate::packet_encoding::Error;
+
+    fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+        if buf.len() < 5 {
+            return Err(crate::packet_encoding::Error::UnexpectedDataLength);
+        }
+        let format = buf[0].into();
+        if format != CodingFormat::VendorSpecific {
+            // Maybe don't ignore the company and vendor id, and check if they are wrong.
+            return Ok((Self::Assigned(format), 5));
+        }
+        let company_id = u16::from_le_bytes([buf[1], buf[2]]).into();
+        let vendor_specific_codec_id = u16::from_le_bytes([buf[3], buf[4]]);
+        Ok((Self::VendorSpecific { company_id, vendor_specific_codec_id }, 5))
+    }
+}
+
 #[cfg(test)]
 mod tests {
+    use crate::packet_encoding::Decodable;
+
     use super::*;
 
     #[test]
@@ -136,4 +172,21 @@
 
         interval.encode(&mut buf[..]).expect_err("should fail");
     }
+
+    #[test]
+    fn decode_codec_id() {
+        let assigned = [0x01, 0x00, 0x00, 0x00, 0x00];
+        let (codec_id, _) = CodecId::decode(&assigned[..]).expect("should succeed");
+        assert_eq!(codec_id, CodecId::Assigned(CodingFormat::ALawLog));
+
+        let vendor_specific = [0xFF, 0x36, 0xFD, 0x11, 0x22];
+        let (codec_id, _) = CodecId::decode(&vendor_specific[..]).expect("should succeed");
+        assert_eq!(
+            codec_id,
+            CodecId::VendorSpecific {
+                company_id: (0xFD36 as u16).into(),
+                vendor_specific_codec_id: 0x2211
+            }
+        );
+    }
 }
diff --git a/rust/bt-common/src/generic_audio.rs b/rust/bt-common/src/generic_audio.rs
index cfbcbd3..c8a8530 100644
--- a/rust/bt-common/src/generic_audio.rs
+++ b/rust/bt-common/src/generic_audio.rs
@@ -3,7 +3,7 @@
 // found in the LICENSE file.
 
 pub mod codec_capabilities;
-
+pub mod codec_configuration;
 pub mod metadata_ltv;
 
 use crate::{codable_as_bitmask, decodable_enum};
diff --git a/rust/bt-common/src/generic_audio/codec_configuration.rs b/rust/bt-common/src/generic_audio/codec_configuration.rs
new file mode 100644
index 0000000..41f5c07
--- /dev/null
+++ b/rust/bt-common/src/generic_audio/codec_configuration.rs
@@ -0,0 +1,165 @@
+// Copyright 2024 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::collections::HashSet;
+use std::ops::RangeBounds;
+
+use crate::core::ltv::LtValue;
+use crate::decodable_enum;
+
+use super::AudioLocation;
+
+#[derive(Copy, Clone, Hash, PartialEq, Eq, Debug)]
+pub enum CodecConfigurationType {
+    SamplingFrequency,
+    FrameDuration,
+    AudioChannelAllocation,
+    OctetsPerCodecFrame,
+    CodecFramesPerSdu,
+}
+
+impl From<CodecConfigurationType> for u8 {
+    fn from(value: CodecConfigurationType) -> Self {
+        match value {
+            CodecConfigurationType::SamplingFrequency => 1,
+            CodecConfigurationType::FrameDuration => 2,
+            CodecConfigurationType::AudioChannelAllocation => 3,
+            CodecConfigurationType::OctetsPerCodecFrame => 4,
+            CodecConfigurationType::CodecFramesPerSdu => 5,
+        }
+    }
+}
+
+decodable_enum! {
+    pub enum FrameDuration<u8, crate::packet_encoding::Error, OutOfRange> {
+        SevenFiveMs = 0x00,
+        TenMs = 0x01,
+    }
+}
+
+decodable_enum! {
+    pub enum SamplingFrequency<u8, crate::packet_encoding::Error, OutOfRange> {
+        F8000Hz = 0x01,
+        F11025Hz = 0x02,
+        F16000Hz = 0x03,
+        F22050Hz = 0x04,
+        F24000Hz = 0x05,
+        F32000Hz = 0x06,
+        F44100Hz = 0x07,
+        F48000Hz = 0x08,
+        F88200Hz = 0x09,
+        F96000Hz = 0x0A,
+        F176400Hz = 0x0B,
+        F192000Hz = 0x0C,
+        F384000Hz = 0x0D,
+    }
+}
+
+/// Codec Configuration LTV Structures
+///
+/// Defined in Assigned Numbers Section 6.12.5.
+#[derive(Clone, PartialEq, Eq, Debug)]
+pub enum CodecConfiguration {
+    SamplingFrequency(SamplingFrequency),
+    FrameDuration(FrameDuration),
+    AudioChannelAllocation(std::collections::HashSet<AudioLocation>),
+    OctetsPerCodecFrame(u16),
+    CodecFramesPerSdu(u8),
+}
+
+impl LtValue for CodecConfiguration {
+    type Type = CodecConfigurationType;
+
+    const NAME: &'static str = "Codec Compatability";
+
+    fn type_from_octet(x: u8) -> Option<Self::Type> {
+        match x {
+            1 => Some(CodecConfigurationType::SamplingFrequency),
+            2 => Some(CodecConfigurationType::FrameDuration),
+            3 => Some(CodecConfigurationType::AudioChannelAllocation),
+            4 => Some(CodecConfigurationType::OctetsPerCodecFrame),
+            5 => Some(CodecConfigurationType::CodecFramesPerSdu),
+            _ => None,
+        }
+    }
+
+    fn length_range_from_type(ty: Self::Type) -> std::ops::RangeInclusive<u8> {
+        match ty {
+            CodecConfigurationType::SamplingFrequency => 2..=2,
+            CodecConfigurationType::FrameDuration => 2..=2,
+            CodecConfigurationType::AudioChannelAllocation => 5..=5,
+            CodecConfigurationType::OctetsPerCodecFrame => 3..=3,
+            CodecConfigurationType::CodecFramesPerSdu => 2..=2,
+        }
+    }
+
+    fn into_type(&self) -> Self::Type {
+        match self {
+            CodecConfiguration::SamplingFrequency(_) => CodecConfigurationType::SamplingFrequency,
+            CodecConfiguration::FrameDuration(_) => CodecConfigurationType::FrameDuration,
+            CodecConfiguration::AudioChannelAllocation(_) => {
+                CodecConfigurationType::AudioChannelAllocation
+            }
+            CodecConfiguration::OctetsPerCodecFrame(_) => {
+                CodecConfigurationType::OctetsPerCodecFrame
+            }
+            CodecConfiguration::CodecFramesPerSdu(_) => CodecConfigurationType::CodecFramesPerSdu,
+        }
+    }
+
+    fn value_encoded_len(&self) -> u8 {
+        // All the CodecCapabilities are constant length. Remove the type octet.
+        let range = Self::length_range_from_type(self.into_type());
+        let std::ops::Bound::Included(len) = range.start_bound() else {
+            unreachable!();
+        };
+        (len - 1).into()
+    }
+
+    fn encode_value(&self, buf: &mut [u8]) -> Result<(), crate::packet_encoding::Error> {
+        match self {
+            CodecConfiguration::SamplingFrequency(freq) => buf[0] = u8::from(*freq),
+            CodecConfiguration::FrameDuration(fd) => buf[0] = u8::from(*fd),
+            CodecConfiguration::AudioChannelAllocation(audio_locations) => {
+                let locations: Vec<&AudioLocation> = audio_locations.into_iter().collect();
+                buf[0..4]
+                    .copy_from_slice(&AudioLocation::to_bits(locations.into_iter()).to_le_bytes());
+            }
+            CodecConfiguration::OctetsPerCodecFrame(octets) => {
+                buf[0..2].copy_from_slice(&octets.to_le_bytes())
+            }
+            CodecConfiguration::CodecFramesPerSdu(frames) => buf[0] = *frames,
+        };
+        Ok(())
+    }
+
+    fn decode_value(
+        ty: &CodecConfigurationType,
+        buf: &[u8],
+    ) -> Result<Self, crate::packet_encoding::Error> {
+        match ty {
+            CodecConfigurationType::SamplingFrequency => {
+                let freq = SamplingFrequency::try_from(buf[0])?;
+                Ok(Self::SamplingFrequency(freq))
+            }
+            CodecConfigurationType::FrameDuration => {
+                let fd = FrameDuration::try_from(buf[0])?;
+                Ok(Self::FrameDuration(fd))
+            }
+            CodecConfigurationType::AudioChannelAllocation => {
+                let raw_value = u32::from_le_bytes(buf[0..4].try_into().unwrap());
+                let locations = AudioLocation::from_bits(raw_value).collect::<HashSet<_>>();
+                Ok(Self::AudioChannelAllocation(locations))
+            }
+            CodecConfigurationType::OctetsPerCodecFrame => {
+                let value = u16::from_le_bytes(buf[0..2].try_into().unwrap());
+                Ok(Self::OctetsPerCodecFrame(value))
+            }
+            CodecConfigurationType::CodecFramesPerSdu => Ok(Self::CodecFramesPerSdu(buf[0])),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {}
diff --git a/rust/bt-gatt/Cargo.toml b/rust/bt-gatt/Cargo.toml
index 24d4084..6cab048 100644
--- a/rust/bt-gatt/Cargo.toml
+++ b/rust/bt-gatt/Cargo.toml
@@ -16,4 +16,4 @@
 
 [dev-dependencies]
 assert_matches.workspace = true
-bt-gatt = { path = ".", features = ["test-utils"] }
+bt-gatt = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index 4908e98..879a0fe 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -2,21 +2,19 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-use crate::central::ScanResult;
-use crate::client::{self, CharacteristicNotification};
-use crate::{types::*, GattTypes};
-
-use bt_common::{PeerId, Uuid};
 use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
 use futures::future::{ready, Ready};
-use futures::{Future, Stream};
+use futures::Stream;
 use parking_lot::Mutex;
 use std::collections::HashMap;
 use std::sync::Arc;
 use std::task::Poll;
 
-#[derive(Default)]
-pub struct FakeCentral {}
+use bt_common::{PeerId, Uuid};
+
+use crate::central::{AdvertisingDatum, PeerName, ScanResult};
+use crate::client::CharacteristicNotification;
+use crate::{types::*, GattTypes};
 
 #[derive(Default)]
 struct FakePeerServiceInner {
@@ -98,8 +96,8 @@
         ready(Ok((value.len(), false)))
     }
 
-    /// For testing, should call `expect_characteristic_value` with the expected
-    /// value.
+    // For testing, should call `expect_characteristic_value` with the expected
+    // value.
     fn write_characteristic<'a>(
         &self,
         handle: &Handle,
@@ -114,7 +112,7 @@
         };
         // Value written was not expected.
         if buf.len() != expected.len() || &buf[..expected.len()] != expected.as_slice() {
-            panic!("Value written to characteristic {handle:?} was not expected");
+            panic!("Value written to characteristic {handle:?} was not expected: {buf:?}");
         }
         ready(Ok(()))
     }
@@ -177,7 +175,7 @@
 }
 
 impl Stream for SingleResultStream {
-    type Item = Result<crate::central::ScanResult>;
+    type Item = Result<ScanResult>;
 
     fn poll_next(
         self: std::pin::Pin<&mut Self>,
@@ -192,19 +190,6 @@
     }
 }
 
-pub(crate) struct ClientConnectFut {}
-
-impl Future for ClientConnectFut {
-    type Output = Result<FakeClient>;
-
-    fn poll(
-        self: std::pin::Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> Poll<Self::Output> {
-        todo!()
-    }
-}
-
 pub struct FakeTypes {}
 
 impl GattTypes for FakeTypes {
@@ -217,21 +202,22 @@
     type PeerService = FakePeerService;
     type ServiceConnectFut = Ready<Result<FakePeerService>>;
     type CharacteristicDiscoveryFut = Ready<Result<Vec<Characteristic>>>;
-    type NotificationStream = UnboundedReceiver<Result<client::CharacteristicNotification>>;
+    type NotificationStream = UnboundedReceiver<Result<CharacteristicNotification>>;
     type ReadFut<'a> = Ready<Result<(usize, bool)>>;
     type WriteFut<'a> = Ready<Result<()>>;
 }
 
+#[derive(Default)]
+pub struct FakeCentral {}
+
 impl crate::Central<FakeTypes> for FakeCentral {
     fn scan(&self, _filters: &[crate::central::ScanFilter]) -> SingleResultStream {
         SingleResultStream {
             result: Some(Ok(ScanResult {
                 id: PeerId(1),
                 connectable: true,
-                name: crate::central::PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
-                advertised: vec![crate::central::AdvertisingDatum::Services(vec![Uuid::from_u16(
-                    0x1844,
-                )])],
+                name: PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
+                advertised: vec![AdvertisingDatum::Services(vec![Uuid::from_u16(0x1844)])],
             })),
         }
     }
diff --git a/rust/bt-pacs/src/lib.rs b/rust/bt-pacs/src/lib.rs
index 7a62d42..38efa40 100644
--- a/rust/bt-pacs/src/lib.rs
+++ b/rust/bt-pacs/src/lib.rs
@@ -3,6 +3,7 @@
 // found in the LICENSE file.
 
 use bt_common::core::ltv::LtValue;
+use bt_common::core::CodecId;
 use bt_common::generic_audio::codec_capabilities::CodecCapability;
 use bt_common::generic_audio::metadata_ltv::Metadata;
 use bt_common::generic_audio::{AudioLocation, ContextType};
@@ -10,38 +11,6 @@
 use bt_common::Uuid;
 use bt_gatt::{client::FromCharacteristic, Characteristic};
 
-/// Codec_ID communicated by the Published Audio Capabilities Service
-/// From Section 3.1 of the Spec.
-/// Used in [`PacRecord`]
-#[derive(Debug, Clone, PartialEq)]
-pub enum CodecId {
-    /// From the Assigned Numbers. Format will not be
-    /// `CodingFormat::VendorSpecific`
-    Assigned(bt_common::core::CodingFormat),
-    VendorSpecific {
-        company_id: bt_common::CompanyId,
-        vendor_specific_codec_id: u16,
-    },
-}
-
-impl bt_common::packet_encoding::Decodable for CodecId {
-    type Error = bt_common::packet_encoding::Error;
-
-    fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
-        if buf.len() < 5 {
-            return Err(bt_common::packet_encoding::Error::UnexpectedDataLength);
-        }
-        let format = buf[0].into();
-        if format != bt_common::core::CodingFormat::VendorSpecific {
-            // Maybe don't ignore the company and vendor id, and check if they are wrong.
-            return Ok((Self::Assigned(format), 5));
-        }
-        let company_id = u16::from_le_bytes([buf[1], buf[2]]).into();
-        let vendor_specific_codec_id = u16::from_le_bytes([buf[3], buf[4]]);
-        Ok((Self::VendorSpecific { company_id, vendor_specific_codec_id }, 5))
-    }
-}
-
 /// A Published Audio Capability (PAC) record.
 /// Published Audio Capabilities represent the capabilities of a given peer to
 /// transmit or receive Audio capabilities, exposed in PAC records, represent