rust/bt-{bass, common}: Implement all operations
Define types for representing a broadcast source. This makes it easier
to collect information about a broadcast source when broadcast assistant
is processing LE central scan results.
Implement all operations available with the Broadcast Audio Scan Service
client:
- adding/modifying/deleting broadcast sources
- marking scanning as started/stopped
Bug: b/308483171
Change-Id: I9f3a045e84efaa540c38e6f4753ece2c9c3796ae
Reviewed-on: https://bluetooth-review.git.corp.google.com/c/bluetooth/+/1322
Reviewed-by: Marie Janssen <jamuraa@google.com>
diff --git a/rust/bt-bass/Cargo.toml b/rust/bt-bass/Cargo.toml
index 81e18a2..2240624 100644
--- a/rust/bt-bass/Cargo.toml
+++ b/rust/bt-bass/Cargo.toml
@@ -4,10 +4,18 @@
edition.workspace = true
license.workspace = true
+[features]
+default = []
+test-utils = []
+
[dependencies]
bt-common.workspace = true
-bt-gatt = { workspace = true , features = ["test-utils"] }
+bt-gatt = { workspace = true, features = ["test-utils"] }
futures.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tracing.workspace = true
+
+[dev-dependencies]
+assert_matches.workspace = true
+bt-bass = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
index 959e913..3aa0b18 100644
--- a/rust/bt-bass/src/client.rs
+++ b/rust/bt-bass/src/client.rs
@@ -5,14 +5,17 @@
pub mod error;
pub mod event;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::sync::Arc;
-use futures::stream::{BoxStream, SelectAll, StreamExt};
+use futures::stream::{BoxStream, FusedStream, SelectAll, Stream, StreamExt};
+use futures::Future;
use parking_lot::Mutex;
use tracing::warn;
-use bt_common::packet_encoding::{Decodable, Encodable};
+use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::generic_audio::metadata_ltv::Metadata;
+use bt_common::packet_encoding::Decodable;
use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic};
use bt_gatt::types::{Handle, WriteMode};
@@ -22,6 +25,22 @@
const READ_CHARACTERISTIC_BUFFER_SIZE: usize = 255;
+/// Index into the vector of BIG subgroups. Valid value range is [0 to len of
+/// BIG vector).
+pub type SubgroupIndex = u8;
+
+/// Synchronization information for BIG groups to tell which
+/// BISes it should sync to.
+pub type BigToBisSync = HashSet<(SubgroupIndex, BisIndex)>;
+
+pub fn big_to_bis_sync_indices(info: &BigToBisSync) -> HashMap<SubgroupIndex, Vec<BisIndex>> {
+ let mut sync_map = HashMap::new();
+ for (ith_group, bis_index) in info.iter() {
+ sync_map.entry(*ith_group).or_insert(Vec::new()).push(*bis_index);
+ }
+ sync_map
+}
+
/// Keeps track of Source_ID and Broadcast_ID that are associated together.
/// Source_ID is assigned by the BASS server to a Broadcast Receive State
/// characteristic. If the remote peer with the BASS server autonomously
@@ -31,40 +50,62 @@
/// is unqiue to BASS, we track the Broadcast_ID that a Source_ID is associated
/// so that it can be used by upper layers.
#[derive(Default)]
-pub struct BroadcastSourceIdTracker {
- source_to_broadcast: HashMap<SourceId, BroadcastId>,
-}
+pub(crate) struct KnownBroadcastSources(HashMap<Handle, BroadcastReceiveState>);
-impl BroadcastSourceIdTracker {
- fn new() -> Self {
- Self::default()
+impl KnownBroadcastSources {
+ fn new(receive_states: HashMap<Handle, BroadcastReceiveState>) -> Self {
+ KnownBroadcastSources(receive_states)
}
- /// Updates the broadcast ID associated with the given source ID and returns
- /// the previously-associated broadcast ID if it exists.
- fn update(&mut self, source_id: SourceId, broadcast_id: BroadcastId) -> Option<BroadcastId> {
- self.source_to_broadcast.insert(source_id, broadcast_id)
+ /// Updates the value of the specified broadcast receive state
+ /// characteristic. Returns the old value if it existed.
+ fn update_state(
+ &mut self,
+ key: Handle,
+ value: BroadcastReceiveState,
+ ) -> Option<BroadcastReceiveState> {
+ self.0.insert(key, value)
}
+ /// Given the broadcast ID, find the corresponding source ID.
+ /// Returns none if the server doesn't know the specified broadcast source
+ /// because a) the broadcast source was never added or discovered; or,
+ /// b) the broadcast source was removed from remove operation; or,
+ /// c) the broadcast source was removed by the server to add a different
+ /// broadcast source.
fn source_id(&self, broadcast_id: &BroadcastId) -> Option<SourceId> {
- self.source_to_broadcast
- .iter()
- .find_map(|(sid, bid)| (*bid == *broadcast_id).then_some(*sid))
+ let Some(state) = self.state(broadcast_id) else {
+ return None;
+ };
+ Some(state.source_id)
+ }
+
+ /// Gets the last updated broadcast receive state value.
+ /// Returns none if the server doesn't know the specified broadcast source.
+ fn state(&self, broadcast_id: &BroadcastId) -> Option<&ReceiveState> {
+ self.0.iter().find_map(|(&_k, &ref v)| match v {
+ BroadcastReceiveState::Empty => None,
+ BroadcastReceiveState::NonEmpty(rs) => {
+ if rs.broadcast_id() == *broadcast_id {
+ return Some(rs);
+ }
+ None
+ }
+ })
}
}
/// Manages connection to the Broadcast Audio Scan Service at the
/// remote Scan Delegator and writes/reads characteristics to/from it.
pub struct BroadcastAudioScanServiceClient<T: bt_gatt::GattTypes> {
- gatt_client: Box<T::PeerService>,
- id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
+ gatt_client: T::PeerService,
/// Broadcast Audio Scan Service only has one Broadcast Audio Scan Control
/// Point characteristic according to BASS Section 3. There shall
/// be one or more Broadcast Receive State characteristics.
audio_scan_control_point: Handle,
/// Broadcast Receive State characteristics can be used to determine the
/// BASS status.
- receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+ broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
/// Keeps track of the broadcast codes that were sent to the remote BASS
/// server.
broadcast_codes: HashMap<SourceId, [u8; 16]>,
@@ -75,7 +116,18 @@
}
impl<T: bt_gatt::GattTypes> BroadcastAudioScanServiceClient<T> {
- async fn create(gatt_client: T::PeerService) -> Result<Self, Error>
+ #[cfg(any(test, feature = "test-utils"))]
+ pub fn create_for_test(gatt_client: T::PeerService, audio_scan_control_point: Handle) -> Self {
+ Self {
+ gatt_client,
+ audio_scan_control_point,
+ broadcast_sources: Default::default(),
+ broadcast_codes: HashMap::new(),
+ notification_streams: None,
+ }
+ }
+
+ pub async fn create(gatt_client: T::PeerService) -> Result<Self, Error>
where
<T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
{
@@ -95,25 +147,15 @@
}
let bascp_handle = *bascp[0].handle();
let brs_chars = Self::discover_brs_characteristics(&gatt_client).await?;
- let mut id_tracker = BroadcastSourceIdTracker::new();
- for c in brs_chars.values() {
- if let Some(read_value) = c {
- if let BroadcastReceiveState::NonEmpty(state) = read_value {
- let _ = id_tracker.update(state.source_id(), state.broadcast_id());
- }
- }
- }
-
- let mut client = Self {
- gatt_client: Box::new(gatt_client),
- id_tracker: Arc::new(Mutex::new(id_tracker)),
+ let mut c = Self {
+ gatt_client,
audio_scan_control_point: bascp_handle,
- receive_states: Arc::new(Mutex::new(brs_chars)),
+ broadcast_sources: Arc::new(Mutex::new(KnownBroadcastSources::new(brs_chars))),
broadcast_codes: HashMap::new(),
notification_streams: None,
};
- client.register_notifications();
- Ok(client)
+ c.register_notifications();
+ Ok(c)
}
// Discover all the Broadcast Receive State characteristics.
@@ -121,7 +163,7 @@
// Characteristics.
async fn discover_brs_characteristics(
gatt_client: &T::PeerService,
- ) -> Result<HashMap<Handle, Option<BroadcastReceiveState>>, Error> {
+ ) -> Result<HashMap<Handle, BroadcastReceiveState>, Error> {
let brs = ServiceCharacteristic::<T>::find(gatt_client, BROADCAST_RECEIVE_STATE_UUID)
.await
.map_err(|e| Error::Gatt(e))?;
@@ -136,7 +178,7 @@
match c.read(&mut buf[..]).await {
Ok(read_bytes) => match BroadcastReceiveState::decode(&buf[0..read_bytes]) {
Ok((decoded, _decoded_bytes)) => {
- brs_map.insert(*c.handle(), Some(decoded));
+ brs_map.insert(*c.handle(), decoded);
continue;
}
Err(e) => warn!(
@@ -147,7 +189,7 @@
},
Err(e) => warn!("Failed to read characteristic ({:?}) value: {:?}", *c.handle(), e),
}
- brs_map.insert(*c.handle(), None);
+ brs_map.insert(*c.handle(), BroadcastReceiveState::Empty);
}
Ok(brs_map)
}
@@ -158,8 +200,8 @@
{
let mut notification_streams = SelectAll::new();
{
- let lock = self.receive_states.lock();
- for handle in lock.keys() {
+ let lock = self.broadcast_sources.lock();
+ for handle in lock.0.keys() {
let stream = self.gatt_client.subscribe(&handle);
notification_streams.push(stream.boxed());
}
@@ -173,44 +215,198 @@
/// notification that are processed by BroadcastAudioScanServiceClient.
/// This method should only be called once.
/// Returns an error if the method is called for a second time.
- pub fn take_event_stream(&mut self) -> Option<BroadcastAudioScanServiceEventStream> {
+ pub fn take_event_stream(
+ &mut self,
+ ) -> Option<impl Stream<Item = Result<Event, Error>> + FusedStream> {
let notification_streams = self.notification_streams.take();
let Some(streams) = notification_streams else {
return None;
};
- let event_stream = BroadcastAudioScanServiceEventStream::new(
- streams,
- self.id_tracker.clone(),
- self.receive_states.clone(),
- );
+ let event_stream = EventStream::new(streams, self.broadcast_sources.clone());
Some(event_stream)
}
+ /// Write to the Broadcast Audio Scan Control Point characteristic in
+ /// without response mode.
+ fn write_to_bascp(
+ &self,
+ op: impl ControlPointOperation,
+ ) -> impl Future<Output = Result<(), Error>> + '_ {
+ let handle = self.audio_scan_control_point;
+ let mut buf = vec![0; op.encoded_len()];
+ let encode_res = op.encode(&mut buf[..]);
+ async move {
+ match encode_res {
+ Err(e) => Err(Error::Packet(e)),
+ Ok(_) => self
+ .gatt_client
+ .write_characteristic(&handle, WriteMode::WithoutResponse, 0, buf.as_slice())
+ .await
+ .map_err(|e| Error::Gatt(e)),
+ }
+ }
+ }
+
+ fn get_source_id(&self, broadcast_id: &BroadcastId) -> Result<SourceId, Error> {
+ self.broadcast_sources
+ .lock()
+ .source_id(broadcast_id)
+ .ok_or(Error::UnknownBroadcastSource(*broadcast_id))
+ }
+
+ /// Returns a clone of the latest known broadcast audio receive state of the
+ /// specified broadcast source given its broadcast id.
+ fn get_broadcast_source_state(&self, broadcast_id: &BroadcastId) -> Option<ReceiveState> {
+ let lock = self.broadcast_sources.lock();
+ lock.state(broadcast_id).clone().map(|rs| rs.clone())
+ }
+
+ /// Indicates to the remote BASS server that we have started scanning for
+ /// broadcast sources on behalf of it. If the scan delegator that serves
+ /// the BASS server is collocated with a broadcast sink, this may or may
+ /// not change the scanning behaviour of the the broadcast sink.
+ pub async fn remote_scan_started(&mut self) -> Result<(), Error> {
+ let op = RemoteScanStartedOperation;
+ self.write_to_bascp(op).await
+ }
+
+ /// Indicates to the remote BASS server that we have stopped scanning for
+ /// broadcast sources on behalf of it.
+ pub async fn remote_scan_stopped(&mut self) -> Result<(), Error> {
+ let op = RemoteScanStoppedOperation;
+ self.write_to_bascp(op).await
+ }
+
+ /// Provides the BASS server with information regarding a Broadcast Source.
+ pub async fn add_broadcast_source(
+ &mut self,
+ broadcast_id: BroadcastId,
+ address_type: AddressType,
+ advertiser_address: [u8; ADDRESS_BYTE_SIZE],
+ sid: AdvertisingSetId,
+ pa_sync: PaSync,
+ pa_interval: PaInterval,
+ subgroups: Vec<BigSubgroup>,
+ ) -> Result<(), Error> {
+ let op = AddSourceOperation::new(
+ address_type,
+ advertiser_address,
+ sid,
+ broadcast_id,
+ pa_sync,
+ pa_interval,
+ subgroups,
+ );
+ self.write_to_bascp(op).await
+ }
+
+ /// Requests the BASS server to add or update Metadata for the Broadcast
+ /// Source, and/or to request the server to synchronize to, or to stop
+ /// synchronization to, a PA and/or a BIS.
+ ///
+ /// # Arguments
+ ///
+ /// * `broadcast_id` - id of the broadcast source to modify
+ /// * `pa_sync` - pa sync mode the scan delegator peer should attempt to be
+ /// in
+ /// * `pa_interval` - updated PA interval value. If none, unknown value is
+ /// used
+ /// * `bis_sync` - desired BIG to BIS synchronization information. If empty,
+ /// it's not updated
+ /// * `metadata_map` - map of updated metadata for BIGs. If a mapping does
+ /// not exist for a BIG, that BIG's metadata is not updated
+ pub async fn modify_broadcast_source(
+ &mut self,
+ broadcast_id: BroadcastId,
+ pa_sync: PaSync,
+ pa_interval: Option<PaInterval>,
+ bis_sync: Option<BigToBisSync>,
+ metadata_map: Option<HashMap<SubgroupIndex, Vec<Metadata>>>,
+ ) -> Result<(), Error> {
+ let op = {
+ let mut state = self
+ .get_broadcast_source_state(&broadcast_id)
+ .ok_or(Error::UnknownBroadcastSource(broadcast_id))?;
+
+ // Update BIS_Sync param for BIGs if applicable.
+ if let Some(m) = bis_sync {
+ let sync_map = big_to_bis_sync_indices(&m);
+
+ for (big_index, group) in state.subgroups.iter_mut().enumerate() {
+ if let Some(bis_indices) = sync_map.get(&(big_index as u8)) {
+ group.bis_sync.set_sync(bis_indices).map_err(|e| Error::Packet(e))?;
+ }
+ }
+ }
+ // Update metadata for BIGs if applicable.
+ if let Some(mut m) = metadata_map {
+ for (big_index, group) in state.subgroups.iter_mut().enumerate() {
+ if let Some(metadata) = m.remove(&(big_index as u8)) {
+ group.metadata = metadata;
+ }
+ }
+
+ // Left over metadata values are new subgroups that are to be added. New
+ // subgroups can only be added if the subgroup index is
+ // contiguous to existing subgroups.
+ let mut new_big_indices: Vec<&u8> = m.keys().collect();
+ new_big_indices.sort();
+ for big_index in new_big_indices {
+ if (*big_index as usize) != state.subgroups.len() {
+ warn!("cannot add new [{big_index}th] subgroup");
+ break;
+ }
+ let new_subgroup = BigSubgroup::new(None).with_metadata(m[big_index].clone());
+ state.subgroups.push(new_subgroup);
+ }
+ }
+
+ ModifySourceOperation::new(
+ state.source_id,
+ pa_sync,
+ pa_interval.unwrap_or(PaInterval::unknown()),
+ state.subgroups,
+ )
+ };
+ self.write_to_bascp(op).await
+ }
+
+ pub async fn remove_broadcast_source(
+ &mut self,
+ broadcast_id: BroadcastId,
+ ) -> Result<(), Error> {
+ let source_id = self.get_source_id(&broadcast_id)?;
+
+ let op = RemoveSourceOperation::new(source_id);
+ self.write_to_bascp(op).await
+ }
+
/// Sets the broadcast code for a particular broadcast stream.
pub async fn set_broadcast_code(
&mut self,
broadcast_id: BroadcastId,
broadcast_code: [u8; 16],
) -> Result<(), Error> {
- let source_id = self.id_tracker.lock().source_id(&broadcast_id).ok_or(Error::Generic(
- format!("Cannot find Source ID for specified Broadcast ID {broadcast_id:?}"),
- ))?;
+ let source_id = self.get_source_id(&broadcast_id)?;
let op = SetBroadcastCodeOperation::new(source_id, broadcast_code.clone());
- let mut buf = vec![0; op.encoded_len()];
- let _ = op.encode(&mut buf[..]).map_err(|e| Error::Packet(e))?;
-
- let c = &self.audio_scan_control_point;
- let _ = self
- .gatt_client
- .write_characteristic(c, WriteMode::WithoutResponse, 0, buf.as_slice())
- .await
- .map_err(|e| Error::Gatt(e))?;
+ self.write_to_bascp(op).await?;
// Save the broadcast code we sent.
self.broadcast_codes.insert(source_id, broadcast_code);
Ok(())
}
+
+ /// Returns a list of currently known broadcast sources at the time
+ /// this method was called.
+ pub fn known_broadcast_sources(&self) -> Vec<(Handle, BroadcastReceiveState)> {
+ let lock = self.broadcast_sources.lock();
+ let mut brs = Vec::new();
+ for (k, v) in lock.0.iter() {
+ brs.push((*k, v.clone()));
+ }
+ brs
+ }
}
#[cfg(test)]
@@ -219,9 +415,11 @@
use std::task::Poll;
+ use assert_matches::assert_matches;
use futures::executor::block_on;
use futures::{pin_mut, FutureExt};
+ use bt_common::core::AdvertisingSetId;
use bt_common::Uuid;
use bt_gatt::test_utils::*;
use bt_gatt::types::{
@@ -317,10 +515,11 @@
// Check that all the characteristics have been discovered.
assert_eq!(client.audio_scan_control_point, AUDIO_SCAN_CONTROL_POINT_HANDLE);
- let broadcast_receive_states = client.receive_states.lock();
- assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_1_HANDLE));
- assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_2_HANDLE));
- assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_3_HANDLE));
+ let broadcast_sources = client.known_broadcast_sources();
+ assert_eq!(broadcast_sources.len(), 3);
+ assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_1_HANDLE).is_some());
+ assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_2_HANDLE).is_some());
+ assert!(broadcast_sources.iter().find(|v| v.0 == RECEIVE_STATE_3_HANDLE).is_some());
}
#[test]
@@ -344,6 +543,7 @@
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
let create_result =
BroadcastAudioScanServiceClient::<FakeTypes>::create(fake_peer_service.clone());
+
pin_mut!(create_result);
let polled = create_result.poll_unpin(&mut noop_cx);
let Poll::Ready(Err(_)) = polled else {
@@ -422,7 +622,7 @@
}
#[test]
- fn pump_notifications() {
+ fn start_event_stream() {
let (mut client, mut fake_peer_service) = setup_client();
let mut event_stream = client.take_event_stream().expect("stream was created");
@@ -452,17 +652,21 @@
// Check that synced and broadcast code required events were sent out.
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let recv_fut = event_stream.select_next_some();
- let event = block_on(recv_fut).expect("should receive event");
- assert_eq!(event, BroadcastAudioScanServiceEvent::SyncedToPa(BroadcastId::new(0x040302)));
let recv_fut = event_stream.select_next_some();
let event = block_on(recv_fut).expect("should receive event");
assert_eq!(
event,
- BroadcastAudioScanServiceEvent::BroadcastCodeRequired(BroadcastId::new(0x040302))
+ Event::AddedBroadcastSource(
+ BroadcastId(0x040302),
+ PaSyncState::Synced,
+ EncryptionStatus::BroadcastCodeRequired
+ )
);
+ // Stream should be pending since no more notifications.
+ assert!(event_stream.poll_next_unpin(&mut noop_cx).is_pending());
+
// Send notification for updating BRS characteristic to indicate it requires
// sync info. Notification for updating the BRS characteristic value for
// characteristic with handle 3.
@@ -488,12 +692,15 @@
],
);
- // Check that sync info required event was sent out.
let recv_fut = event_stream.select_next_some();
let event = block_on(recv_fut).expect("should receive event");
assert_eq!(
event,
- BroadcastAudioScanServiceEvent::SyncInfoRequested(BroadcastId::new(0x050403))
+ Event::AddedBroadcastSource(
+ BroadcastId(0x050403),
+ PaSyncState::SyncInfoRequest,
+ EncryptionStatus::NotEncrypted
+ )
);
// Stream should be pending since no more notifications.
@@ -501,44 +708,251 @@
}
#[test]
+ fn remote_scan_started() {
+ let (mut client, mut fake_peer_service) = setup_client();
+
+ fake_peer_service.expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x01]);
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let op_fut = client.remote_scan_started();
+ pin_mut!(op_fut);
+ let polled = op_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Ok(_)));
+ }
+
+ #[test]
+ fn remote_scan_stopped() {
+ let (mut client, mut fake_peer_service) = setup_client();
+
+ fake_peer_service.expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x00]);
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let op_fut = client.remote_scan_stopped();
+ pin_mut!(op_fut);
+ let polled = op_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Ok(_)));
+ }
+
+ #[test]
+ fn add_broadcast_source() {
+ let (mut client, mut fake_peer_service) = setup_client();
+
+ fake_peer_service.expect_characteristic_value(
+ &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+ vec![
+ 0x02, 0x00, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00, 0x01, 0x11, 0x00, 0x00, 0x00, 0xFF,
+ 0xFF, 0x00,
+ ],
+ );
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let op_fut = client.add_broadcast_source(
+ BroadcastId(0x11),
+ AddressType::Public,
+ [0x04, 0x10, 0x00, 0x00, 0x00, 0x00],
+ AdvertisingSetId(1),
+ PaSync::DoNotSync,
+ PaInterval::unknown(),
+ vec![],
+ );
+ pin_mut!(op_fut);
+ let polled = op_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Ok(_)));
+ }
+
+ #[test]
+ fn modify_broadcast_source() {
+ let (mut client, mut fake_peer_service) = setup_client();
+
+ // Manually update the broadcast source tracker for testing purposes.
+ // In practice, this would have been updated from BRS value change notification.
+ client.broadcast_sources.lock().update_state(
+ RECEIVE_STATE_1_HANDLE,
+ BroadcastReceiveState::NonEmpty(ReceiveState {
+ source_id: 0x11,
+ source_address_type: AddressType::Public,
+ source_address: [1, 2, 3, 4, 5, 6],
+ source_adv_sid: AdvertisingSetId(1),
+ broadcast_id: BroadcastId(0x11),
+ pa_sync_state: PaSyncState::Synced,
+ big_encryption: EncryptionStatus::BroadcastCodeRequired,
+ subgroups: vec![],
+ }),
+ );
+
+ #[rustfmt::skip]
+ fake_peer_service.expect_characteristic_value(
+ &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+ vec![
+ 0x03, 0x11, 0x00, // opcode, source id, pa sync
+ 0xAA, 0xAA, 0x00, // pa sync, pa interval, num of subgroups
+ ],
+ );
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let op_fut = client.modify_broadcast_source(
+ BroadcastId(0x11),
+ PaSync::DoNotSync,
+ Some(PaInterval(0xAAAA)),
+ None,
+ None,
+ );
+ pin_mut!(op_fut);
+ let polled = op_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Ok(_)));
+ }
+
+ #[test]
+ fn modify_broadcast_source_updates_groups() {
+ let (mut client, mut fake_peer_service) = setup_client();
+
+ // Manually update the broadcast source tracker for testing purposes.
+ // In practice, this would have been updated from BRS value change notification.
+ client.broadcast_sources.lock().update_state(
+ RECEIVE_STATE_1_HANDLE,
+ BroadcastReceiveState::NonEmpty(ReceiveState {
+ source_id: 0x11,
+ source_address_type: AddressType::Public,
+ source_address: [1, 2, 3, 4, 5, 6],
+ source_adv_sid: AdvertisingSetId(1),
+ broadcast_id: BroadcastId(0x11),
+ pa_sync_state: PaSyncState::Synced,
+ big_encryption: EncryptionStatus::BroadcastCodeRequired,
+ subgroups: vec![BigSubgroup::new(None)],
+ }),
+ );
+
+ // Default PA interval value and subgroups value read from the BRS
+ // characteristic are used.
+ #[rustfmt::skip]
+ fake_peer_service.expect_characteristic_value(
+ &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+ vec![
+ 0x03, 0x11, 0x00, // opcode, source id, pa sync
+ 0xFF, 0xFF, 0x02, // pa sync, pa interval, num of subgroups
+ 0x17, 0x00, 0x00, 0x00, // bis sync (0th subgroup)
+ 0x02, 0x01, 0x09, // metadata len, metadata
+ 0xFF, 0xFF, 0xFF, 0xFF, // bis sync (1th subgroup)
+ 0x05, 0x04, 0x04, 0x65, 0x6E, 0x67, // metadata len, metadata
+ ],
+ );
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let op_fut = client.modify_broadcast_source(
+ BroadcastId(0x11),
+ PaSync::DoNotSync,
+ None,
+ Some(HashSet::from([(0, 1), (0, 2), (0, 3), (0, 5)])),
+ Some(HashMap::from([
+ (0, vec![Metadata::BroadcastAudioImmediateRenderingFlag]),
+ (1, vec![Metadata::Language("eng".to_string())]),
+ (5, vec![Metadata::ProgramInfoURI("this subgroup shouldn't be added".to_string())]),
+ ])),
+ );
+ pin_mut!(op_fut);
+ let polled = op_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Ok(_)));
+ }
+
+ #[test]
+ fn modify_broadcast_source_fail() {
+ let (mut client, _fake_peer_service) = setup_client();
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ // Broadcast source wasn't previously added.
+ let op_fut =
+ client.modify_broadcast_source(BroadcastId(0x11), PaSync::DoNotSync, None, None, None);
+ pin_mut!(op_fut);
+ let polled = op_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Err(_)));
+ }
+
+ #[test]
+ fn remove_broadcast_source() {
+ let (mut client, mut fake_peer_service) = setup_client();
+
+ // Manually update the broadcast source tracker for testing purposes.
+ // In practice, this would have been updated from BRS value change notification.
+ client.broadcast_sources.lock().update_state(
+ RECEIVE_STATE_1_HANDLE,
+ BroadcastReceiveState::NonEmpty(ReceiveState {
+ source_id: 0x11,
+ source_address_type: AddressType::Public,
+ source_address: [1, 2, 3, 4, 5, 6],
+ source_adv_sid: AdvertisingSetId(1),
+ broadcast_id: BroadcastId(0x11),
+ pa_sync_state: PaSyncState::Synced,
+ big_encryption: EncryptionStatus::BroadcastCodeRequired,
+ subgroups: vec![],
+ }),
+ );
+
+ fake_peer_service
+ .expect_characteristic_value(&AUDIO_SCAN_CONTROL_POINT_HANDLE, vec![0x05, 0x11]);
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ // Broadcast source wasn't previously added.
+ let op_fut = client.remove_broadcast_source(BroadcastId(0x11));
+ pin_mut!(op_fut);
+ let polled = op_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Ok(_)));
+ }
+
+ #[test]
+ fn remove_broadcast_source_fail() {
+ let (mut client, _fake_peer_service) = setup_client();
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ // Broadcast source wasn't previously added.
+ let op_fut = client.remove_broadcast_source(BroadcastId(0x11));
+ pin_mut!(op_fut);
+ let polled = op_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Err(_)));
+ }
+
+ #[test]
fn set_broadcast_code() {
let (mut client, mut fake_peer_service) = setup_client();
- // Manually update the internal id tracker for testing purposes.
+ // Manually update the broadcast source tracker for testing purposes.
// In practice, this would have been updated from BRS value change notification.
- client.id_tracker.lock().update(0x01, BroadcastId::new(0x030201));
+ client.broadcast_sources.lock().update_state(
+ RECEIVE_STATE_1_HANDLE,
+ BroadcastReceiveState::NonEmpty(ReceiveState {
+ source_id: 0x01,
+ source_address_type: AddressType::Public,
+ source_address: [1, 2, 3, 4, 5, 6],
+ source_adv_sid: AdvertisingSetId(1),
+ broadcast_id: BroadcastId(0x030201),
+ pa_sync_state: PaSyncState::Synced,
+ big_encryption: EncryptionStatus::BroadcastCodeRequired,
+ subgroups: vec![],
+ }),
+ );
- {
- fake_peer_service.expect_characteristic_value(
- &AUDIO_SCAN_CONTROL_POINT_HANDLE,
- vec![0x04, 0x01, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
- );
+ fake_peer_service.expect_characteristic_value(
+ &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+ vec![0x04, 0x01, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
+ );
- let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let set_code_fut = client.set_broadcast_code(BroadcastId::new(0x030201), [1; 16]);
- pin_mut!(set_code_fut);
- let polled = set_code_fut.poll_unpin(&mut noop_cx);
- let Poll::Ready(Ok(_)) = polled else {
- panic!("Expected to succeed");
- };
- }
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let set_code_fut = client.set_broadcast_code(BroadcastId(0x030201), [1; 16]);
+ pin_mut!(set_code_fut);
+ let polled = set_code_fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Ok(_)));
}
#[test]
fn set_broadcast_code_fails() {
let (mut client, _) = setup_client();
- {
- let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let set_code_fut = client.set_broadcast_code(BroadcastId::new(0x030201), [1; 16]);
- pin_mut!(set_code_fut);
- let polled = set_code_fut.poll_unpin(&mut noop_cx);
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let set_code_fut = client.set_broadcast_code(BroadcastId(0x030201), [1; 16]);
+ pin_mut!(set_code_fut);
+ let polled = set_code_fut.poll_unpin(&mut noop_cx);
- // Should fail because we cannot get source id for the broadcast id since BRS
- // Characteristic value wasn't updated.
- let Poll::Ready(Err(_)) = polled else {
- panic!("Expected to fail");
- };
- }
+ // Should fail because we cannot get source id for the broadcast id since BRS
+ // Characteristic value wasn't updated.
+ assert_matches!(polled, Poll::Ready(Err(_)));
}
}
diff --git a/rust/bt-bass/src/client/error.rs b/rust/bt-bass/src/client/error.rs
index 635a87c..1f6e4bd 100644
--- a/rust/bt-bass/src/client/error.rs
+++ b/rust/bt-bass/src/client/error.rs
@@ -7,6 +7,8 @@
use bt_common::packet_encoding::Error as PacketError;
use bt_gatt::types::{Error as BTGattError, Handle};
+use crate::types::BroadcastId;
+
#[derive(Debug, Error)]
pub enum Error {
#[error(
@@ -26,6 +28,12 @@
#[error("GATT operation error: {0}")]
Gatt(BTGattError),
+ #[error("Error while polling BASS client event stream: {0}")]
+ EventStream(Box<Error>),
+
+ #[error("Broadcast source with id ({0:?}) is unknown")]
+ UnknownBroadcastSource(BroadcastId),
+
#[error("Failure occurred: {0}")]
Generic(String),
}
diff --git a/rust/bt-bass/src/client/event.rs b/rust/bt-bass/src/client/event.rs
index b7a5275..2c7ac2f 100644
--- a/rust/bt-bass/src/client/event.rs
+++ b/rust/bt-bass/src/client/event.rs
@@ -2,24 +2,25 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-use std::collections::{HashMap, VecDeque};
+use std::collections::VecDeque;
use std::sync::Arc;
use std::task::Poll;
-use bt_common::packet_encoding::Decodable;
-use bt_gatt::client::CharacteristicNotification;
-use bt_gatt::types::{Error as BtGattError, Handle};
use futures::stream::{BoxStream, FusedStream, SelectAll};
use futures::{Stream, StreamExt};
use parking_lot::Mutex;
+use bt_common::packet_encoding::Decodable;
+use bt_gatt::client::CharacteristicNotification;
+use bt_gatt::types::Error as BtGattError;
+
use crate::client::error::Error;
use crate::client::error::ServiceError;
-use crate::client::BroadcastSourceIdTracker;
+use crate::client::KnownBroadcastSources;
use crate::types::*;
#[derive(Clone, Debug, PartialEq)]
-pub enum BroadcastAudioScanServiceEvent {
+pub enum Event {
// Broadcast Audio Scan Service (BASS) server requested for SyncInfo through PAST procedure.
SyncInfoRequested(BroadcastId),
// BASS server failed to synchornize to PA or did not synchronize to PA.
@@ -32,83 +33,83 @@
BroadcastCodeRequired(BroadcastId),
// BASS server failed to decrypt BIS using the previously provided code.
InvalidBroadcastCode(BroadcastId, [u8; 16]),
+ // BASS server has autonomously synchronized to a BIS that is encrypted, and the server
+ // has the correct encryption key to decrypt the BIS.
+ Decrypting(BroadcastId),
// Received a packet from the BASS server not recognized by this library.
UnknownPacket,
+ // Broadcast source was removed by the BASS server.
+ RemovedBroadcastSource(BroadcastId),
+ // Broadcast source was added by the BASS server.
+ AddedBroadcastSource(BroadcastId, PaSyncState, EncryptionStatus),
}
-impl BroadcastAudioScanServiceEvent {
- pub(crate) fn from_broadcast_receive_state(
- state: &ReceiveState,
- ) -> Vec<BroadcastAudioScanServiceEvent> {
+impl Event {
+ pub(crate) fn from_broadcast_receive_state(state: &ReceiveState) -> Vec<Event> {
let mut events = Vec::new();
let pa_sync_state = state.pa_sync_state();
let broadcast_id = state.broadcast_id();
match pa_sync_state {
- PaSyncState::SyncInfoRequest => {
- events.push(BroadcastAudioScanServiceEvent::SyncInfoRequested(broadcast_id))
- }
- PaSyncState::Synced => {
- events.push(BroadcastAudioScanServiceEvent::SyncedToPa(broadcast_id))
- }
+ PaSyncState::SyncInfoRequest => events.push(Event::SyncInfoRequested(broadcast_id)),
+ PaSyncState::Synced => events.push(Event::SyncedToPa(broadcast_id)),
PaSyncState::FailedToSync | PaSyncState::NotSynced => {
- events.push(BroadcastAudioScanServiceEvent::NotSyncedToPa(broadcast_id))
+ events.push(Event::NotSyncedToPa(broadcast_id))
}
- PaSyncState::NoPast => {
- events.push(BroadcastAudioScanServiceEvent::SyncedFailedNoPast(broadcast_id))
- }
+ PaSyncState::NoPast => events.push(Event::SyncedFailedNoPast(broadcast_id)),
}
match state.big_encryption() {
EncryptionStatus::BroadcastCodeRequired => {
- events.push(BroadcastAudioScanServiceEvent::BroadcastCodeRequired(broadcast_id))
+ events.push(Event::BroadcastCodeRequired(broadcast_id))
}
- EncryptionStatus::BadCode(code) => events.push(
- BroadcastAudioScanServiceEvent::InvalidBroadcastCode(broadcast_id, code.clone()),
- ),
+ EncryptionStatus::Decrypting => events.push(Event::Decrypting(broadcast_id)),
+ EncryptionStatus::BadCode(code) => {
+ events.push(Event::InvalidBroadcastCode(broadcast_id, code.clone()))
+ }
_ => {}
};
events
}
}
-pub struct BroadcastAudioScanServiceEventStream {
- // Polled to receive BASS notifications.
+/// Trait for representing a stream that outputs Events from BASS. If there was
+/// an error the stream should output error instead and terminate.
+
+pub struct EventStream {
+ // Actual GATT notification streams that we poll from.
notification_streams:
SelectAll<BoxStream<'static, Result<CharacteristicNotification, BtGattError>>>,
- event_queue: VecDeque<Result<BroadcastAudioScanServiceEvent, Error>>,
+ event_queue: VecDeque<Result<Event, Error>>,
terminated: bool,
// States to be updated.
- id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
- receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+ broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
}
-impl BroadcastAudioScanServiceEventStream {
+impl EventStream {
pub(crate) fn new(
notification_streams: SelectAll<
BoxStream<'static, Result<CharacteristicNotification, BtGattError>>,
>,
- id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
- receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+ broadcast_sources: Arc<Mutex<KnownBroadcastSources>>,
) -> Self {
Self {
notification_streams,
event_queue: VecDeque::new(),
terminated: false,
- id_tracker,
- receive_states,
+ broadcast_sources,
}
}
}
-impl FusedStream for BroadcastAudioScanServiceEventStream {
+impl FusedStream for EventStream {
fn is_terminated(&self) -> bool {
self.terminated
}
}
-impl Stream for BroadcastAudioScanServiceEventStream {
- type Item = Result<BroadcastAudioScanServiceEvent, Error>;
+impl Stream for EventStream {
+ type Item = Result<Event, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
@@ -119,85 +120,92 @@
}
loop {
+ if let Some(item) = self.event_queue.pop_front() {
+ match item {
+ Ok(event) => return Poll::Ready(Some(Ok(event))),
+ Err(e) => {
+ // If an error was received, we terminate the event stream, but send an
+ // error to indicate why it was terminated.
+ self.terminated = true;
+ return Poll::Ready(Some(Err(e)));
+ }
+ }
+ }
+
match self.notification_streams.poll_next_unpin(cx) {
- Poll::Pending => {}
+ Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
- self.event_queue.push_back(Err(Error::Service(
- super::error::ServiceError::NotificationChannelClosed(format!(
+ let err = Error::EventStream(Box::new(Error::Service(
+ ServiceError::NotificationChannelClosed(format!(
"GATT notification stream for BRS characteristics closed"
)),
)));
+ self.event_queue.push_back(Err(err));
}
- Poll::Ready(Some(received)) => {
- match received {
- Err(error) => match error {
- BtGattError::PeerNotRecognized(_)
- | BtGattError::ScanFailed(_)
- | BtGattError::Other(_) => {
- self.event_queue.push_back(Err(Error::Service(
- ServiceError::NotificationChannelClosed(format!(
- "unexpected error encountered from GATT notification"
- )),
- )));
- }
- BtGattError::PeerDisconnected(id) => {
- self.event_queue.push_back(Err(Error::Service(
- ServiceError::NotificationChannelClosed(format!(
- "peer ({id}) disconnected"
- )),
- )));
- }
- _ => {} // TODO(b/308483171): decide what to do for non-critical errors.
- },
- Ok(notification) => {
- let Ok((brs, _)) =
- BroadcastReceiveState::decode(notification.value.as_slice())
- else {
- self.event_queue
- .push_back(Ok(BroadcastAudioScanServiceEvent::UnknownPacket));
- break;
- };
- match &brs {
- BroadcastReceiveState::Empty => {}
- BroadcastReceiveState::NonEmpty(state) => {
- let events = BroadcastAudioScanServiceEvent::from_broadcast_receive_state(state);
- events
- .into_iter()
- .for_each(|e| self.event_queue.push_back(Ok(e)));
+ Poll::Ready(Some(Err(error))) => {
+ // Deem all errors as critical.
+ let err = Error::EventStream(Box::new(Error::Gatt(error)));
+ self.event_queue.push_back(Err(err));
+ }
+ Poll::Ready(Some(Ok(notification))) => {
+ let char_handle = notification.handle;
+ let Ok((new_state, _)) =
+ BroadcastReceiveState::decode(notification.value.as_slice())
+ else {
+ self.event_queue.push_back(Ok(Event::UnknownPacket));
+ continue;
+ };
- // Update broadcast ID to source ID mapping.
- let _ = self
- .id_tracker
- .lock()
- .update(state.source_id(), state.broadcast_id());
- }
- };
+ let maybe_prev_state = {
+ let mut lock = self.broadcast_sources.lock();
+ lock.update_state(char_handle, new_state.clone())
+ };
+
+ let mut multi_events = VecDeque::new();
+
+ // If the previous value was not empty, check if it was overwritten.
+ if let Some(ref prev_state) = maybe_prev_state {
+ if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
+ if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state)
{
- // Update the Broadcast Receive States.
- let mut lock = self.receive_states.lock();
- let char = lock.get_mut(¬ification.handle).unwrap();
- char.replace(brs);
+ multi_events.push_back(Ok(Event::RemovedBroadcastSource(
+ prev_receive_state.broadcast_id,
+ )));
}
}
}
+
+ // BRS characteristic value was updated with a new broadcast source
+ // information.
+ if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
+ let is_new_source = match maybe_prev_state {
+ Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
+ None => true,
+ };
+ if is_new_source {
+ multi_events.push_back(Ok(Event::AddedBroadcastSource(
+ receive_state.broadcast_id,
+ receive_state.pa_sync_state,
+ receive_state.big_encryption,
+ )));
+ } else {
+ let other_events = Event::from_broadcast_receive_state(receive_state);
+ for e in other_events.into_iter() {
+ multi_events.push_back(Ok(e));
+ }
+ }
+ }
+ if multi_events.len() != 0 {
+ self.event_queue.append(&mut multi_events);
+ continue;
+ }
+ continue;
}
};
+
break;
}
-
- let popped = self.event_queue.pop_front();
- match popped {
- None => Poll::Pending,
- Some(item) => match item {
- Ok(event) => Poll::Ready(Some(Ok(event))),
- Err(e) => {
- // If an error was received, we terminate the event stream, but send an error to
- // indicate why it was terminated.
- self.terminated = true;
- Poll::Ready(Some(Err(e)))
- }
- },
- }
+ Poll::Pending
}
}
@@ -205,21 +213,26 @@
mod tests {
use super::*;
+ use std::collections::HashMap;
+
+ use assert_matches::assert_matches;
use futures::channel::mpsc::unbounded;
+ use bt_gatt::types::Handle;
+
#[test]
- fn poll_broadcast_audio_scan_service_event_stream() {
+ fn poll_event_stream() {
let mut streams = SelectAll::new();
let (sender1, receiver1) = unbounded();
let (sender2, receiver2) = unbounded();
streams.push(receiver1.boxed());
streams.push(receiver2.boxed());
- let id_tracker = Arc::new(Mutex::new(BroadcastSourceIdTracker::new()));
- let receive_states =
- Arc::new(Mutex::new(HashMap::from([(Handle(0x1), None), (Handle(0x2), None)])));
- let mut event_streams =
- BroadcastAudioScanServiceEventStream::new(streams, id_tracker, receive_states);
+ let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
+ (Handle(0x1), BroadcastReceiveState::Empty),
+ (Handle(0x2), BroadcastReceiveState::Empty),
+ ]))));
+ let mut event_streams = EventStream::new(streams, source_tracker);
// Send notifications to underlying streams.
let bad_code_status =
@@ -259,30 +272,16 @@
// Events should have been generated from notifications.
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- match event_streams.poll_next_unpin(&mut noop_cx) {
- Poll::Ready(Some(Ok(event))) => assert_eq!(
- event,
- BroadcastAudioScanServiceEvent::NotSyncedToPa(BroadcastId::new(0x030201))
- ),
- _ => panic!("should have received event"),
- }
- match event_streams.poll_next_unpin(&mut noop_cx) {
- Poll::Ready(Some(Ok(event))) => assert_eq!(
- event,
- BroadcastAudioScanServiceEvent::InvalidBroadcastCode(
- BroadcastId::new(0x030201),
- [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
- )
- ),
- _ => panic!("should have received event"),
- }
- match event_streams.poll_next_unpin(&mut noop_cx) {
- Poll::Ready(Some(Ok(event))) => assert_eq!(
- event,
- BroadcastAudioScanServiceEvent::SyncedFailedNoPast(BroadcastId::new(0x040302))
- ),
- _ => panic!("should have received event"),
- }
+
+ let polled = event_streams.poll_next_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
+ assert_eq!(event, Event::AddedBroadcastSource(BroadcastId(0x030201), PaSyncState::FailedToSync, EncryptionStatus::BadCode([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])));
+ });
+
+ let polled = event_streams.poll_next_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
+ assert_eq!(event, Event::AddedBroadcastSource(BroadcastId(0x040302), PaSyncState::NoPast, EncryptionStatus::NotEncrypted));
+ });
// Should be pending because no more events generated from notifications.
assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
@@ -306,13 +305,67 @@
// Event should have been generated from notification.
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- match event_streams.poll_next_unpin(&mut noop_cx) {
- Poll::Ready(Some(Ok(event))) => assert_eq!(
- event,
- BroadcastAudioScanServiceEvent::SyncedToPa(BroadcastId::new(0x040302))
- ),
- _ => panic!("should have received event"),
- }
+ assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::SyncedToPa(BroadcastId(0x040302))) });
+
+ // Should be pending because no more events generated from notifications.
+ assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
+ }
+
+ #[test]
+ fn broadcast_source_is_removed() {
+ let mut streams = SelectAll::new();
+ let (_sender1, receiver1) = unbounded();
+ let (sender2, receiver2) = unbounded();
+ streams.push(receiver1.boxed());
+ streams.push(receiver2.boxed());
+
+ let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
+ (Handle(0x1), BroadcastReceiveState::Empty),
+ (Handle(0x2), BroadcastReceiveState::Empty),
+ ]))));
+ let mut event_streams = EventStream::new(streams, source_tracker);
+
+ // Send notifications to underlying streams.
+ #[rustfmt::skip]
+ sender2
+ .unbounded_send(Ok(CharacteristicNotification {
+ handle: Handle(0x2),
+ value: vec![
+ 0x02, AddressType::Public as u8, // source id and address type
+ 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // address
+ 0x01, 0x02, 0x03, 0x04, // ad set id and broadcast id
+ PaSyncState::Synced as u8,
+ EncryptionStatus::NotEncrypted.raw_value(),
+ 0x00, // no subgroups
+ ],
+ maybe_truncated: false,
+ }))
+ .expect("should send");
+
+ // Events should have been generated from notifications.
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+ let polled = event_streams.poll_next_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
+ assert_eq!(event, Event::AddedBroadcastSource(BroadcastId(0x040302), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
+ });
+
+ // Should be pending because no more events generated from notifications.
+ assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
+
+ // Send notifications to underlying streams. Ths time, send empty BRS
+ // characteristic value.
+ sender2
+ .unbounded_send(Ok(CharacteristicNotification {
+ handle: Handle(0x2),
+ value: vec![],
+ maybe_truncated: false,
+ }))
+ .expect("should send");
+
+ // Event should have been generated from notification.
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ assert_matches!(event_streams.poll_next_unpin(&mut noop_cx), Poll::Ready(Some(Ok(event))) => { assert_eq!(event, Event::RemovedBroadcastSource(BroadcastId(0x040302))) });
// Should be pending because no more events generated from notifications.
assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
diff --git a/rust/bt-bass/src/lib.rs b/rust/bt-bass/src/lib.rs
index 0d619cd..1f8980b 100644
--- a/rust/bt-bass/src/lib.rs
+++ b/rust/bt-bass/src/lib.rs
@@ -4,3 +4,10 @@
pub mod client;
pub mod types;
+pub use crate::client::error::Error;
+
+#[cfg(any(test, feature = "test-utils"))]
+pub mod test_utils;
+
+#[cfg(test)]
+pub mod tests;
diff --git a/rust/bt-bass/src/test_utils.rs b/rust/bt-bass/src/test_utils.rs
new file mode 100644
index 0000000..459a488
--- /dev/null
+++ b/rust/bt-bass/src/test_utils.rs
@@ -0,0 +1,47 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::task::Poll;
+
+use futures::channel::mpsc::UnboundedReceiver;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::client::error::Error;
+use crate::client::event::*;
+
+pub struct FakeBassEventStream {
+ receiver: UnboundedReceiver<Result<Event, Error>>,
+ terminated: bool,
+}
+
+impl FakeBassEventStream {
+ pub fn new(receiver: UnboundedReceiver<Result<Event, Error>>) -> Self {
+ Self { receiver, terminated: false }
+ }
+}
+
+impl FusedStream for FakeBassEventStream {
+ fn is_terminated(&self) -> bool {
+ self.terminated
+ }
+}
+
+impl Stream for FakeBassEventStream {
+ type Item = Result<Event, Error>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ if self.terminated {
+ return Poll::Ready(None);
+ }
+ let polled = self.receiver.poll_next_unpin(cx);
+ if let Poll::Ready(Some(Err(_))) = &polled {
+ self.terminated = true;
+ }
+ polled
+ }
+}
diff --git a/rust/bt-bass/src/tests.rs b/rust/bt-bass/src/tests.rs
new file mode 100644
index 0000000..64d9107
--- /dev/null
+++ b/rust/bt-bass/src/tests.rs
@@ -0,0 +1,57 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::task::Poll;
+
+use assert_matches::assert_matches;
+use futures::channel::mpsc::unbounded;
+use futures::stream::StreamExt;
+
+use crate::client::error::Error;
+use crate::client::event::*;
+use crate::test_utils::*;
+use crate::types::BroadcastId;
+
+#[test]
+fn fake_bass_event_stream() {
+ let (sender, receiver) = unbounded();
+ let mut stream = FakeBassEventStream::new(receiver);
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+ // Add events.
+ sender.unbounded_send(Ok(Event::SyncedToPa(BroadcastId(1)))).expect("should succeed");
+ sender
+ .unbounded_send(Ok(Event::BroadcastCodeRequired(BroadcastId(1))))
+ .expect("should succeed");
+
+ let polled = stream.poll_next_unpin(&mut noop_cx);
+ let Poll::Ready(Some(Ok(event))) = polled else {
+ panic!("Expected to receive event");
+ };
+ assert_matches!(event, Event::SyncedToPa(_));
+
+ let polled = stream.poll_next_unpin(&mut noop_cx);
+ let Poll::Ready(Some(Ok(event))) = polled else {
+ panic!("Expected to receive event");
+ };
+ assert_matches!(event, Event::BroadcastCodeRequired(_));
+
+ assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+ sender
+ .unbounded_send(Err(Error::EventStream(Box::new(Error::Generic(format!("some error"))))))
+ .expect("should succeed");
+
+ let polled = stream.poll_next_unpin(&mut noop_cx);
+ let Poll::Ready(Some(Err(_))) = polled else {
+ panic!("Expected to receive error");
+ };
+
+ let polled = stream.poll_next_unpin(&mut noop_cx);
+ let Poll::Ready(None) = polled else {
+ panic!("Expected to have terminated");
+ };
+}
diff --git a/rust/bt-bass/src/types.rs b/rust/bt-bass/src/types.rs
index efb5e60..f810a3d 100644
--- a/rust/bt-bass/src/types.rs
+++ b/rust/bt-bass/src/types.rs
@@ -6,9 +6,9 @@
use bt_common::core::{AdvertisingSetId, PaInterval};
use bt_common::generic_audio::metadata_ltv::*;
use bt_common::packet_encoding::{Decodable, Encodable, Error as PacketError};
-use bt_common::Uuid;
+use bt_common::{decodable_enum, Uuid};
-const ADDRESS_BYTE_SIZE: usize = 6;
+pub const ADDRESS_BYTE_SIZE: usize = 6;
const NUM_SUBGROUPS_BYTE_SIZE: usize = 1;
const PA_SYNC_BYTE_SIZE: usize = 1;
const SOURCE_ID_BYTE_SIZE: usize = 1;
@@ -20,16 +20,18 @@
pub type SourceId = u8;
+/// BIS index value of a particular BIS. Valid value range is [1 to len of BIS]
+pub type BisIndex = u8;
+
/// Broadcast_ID is a 3-byte data on the wire.
/// See Broadcast Audio Scan Service (BASS) spec v1.0 Table 3.5 for details.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
-pub struct BroadcastId(u32);
+pub struct BroadcastId(pub u32);
impl BroadcastId {
// On the wire, Broadcast_ID is transported in 3 bytes.
- const BYTE_SIZE: usize = 3;
+ pub const BYTE_SIZE: usize = 3;
- #[cfg(test)]
pub fn new(raw_value: u32) -> Self {
Self(raw_value)
}
@@ -39,6 +41,12 @@
}
}
+impl std::fmt::Display for BroadcastId {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{:#08x}", self.0)
+ }
+}
+
impl TryFrom<u32> for BroadcastId {
type Error = PacketError;
@@ -84,46 +92,30 @@
}
}
+decodable_enum! {
+ pub enum ControlPointOpcode<u8, bt_common::packet_encoding::Error, OutOfRange> {
+ RemoteScanStopped = 0x00,
+ RemoteScanStarted = 0x01,
+ AddSource = 0x02,
+ ModifySource = 0x03,
+ SetBroadcastCode = 0x04,
+ RemoveSource = 0x05,
+ }
+}
+
/// Broadcast Audio Scan Control Point characteristic opcode as defined in
/// Broadcast Audio Scan Service spec v1.0 Section 3.1.
-#[derive(Debug, PartialEq)]
-pub enum ControlPointOpcode {
- RemoteScanStopped = 0x00,
- RemoteScanStarted = 0x01,
- AddSource = 0x02,
- ModifySource = 0x03,
- SetBroadcastCode = 0x04,
- RemoveSource = 0x05,
-}
-
impl ControlPointOpcode {
const BYTE_SIZE: usize = 1;
}
-impl TryFrom<u8> for ControlPointOpcode {
- type Error = PacketError;
-
- fn try_from(value: u8) -> Result<Self, Self::Error> {
- let val = match value {
- 0x00 => Self::RemoteScanStopped,
- 0x01 => Self::RemoteScanStarted,
- 0x02 => Self::AddSource,
- 0x03 => Self::ModifySource,
- 0x04 => Self::SetBroadcastCode,
- 0x05 => Self::RemoveSource,
- _ => return Err(PacketError::OutOfRange),
- };
- Ok(val)
- }
-}
-
/// Trait for objects that represent a Broadcast Audio Scan Control Point
/// characteristic. When written by a client, the Broadcast Audio Scan Control
/// Point characteristic is defined as an 8-bit enumerated value, known as the
/// opcode, followed by zero or more parameter octets. The opcode represents the
/// operation that would be performed in the Broadcast Audio Scan Service
/// server. See BASS spec v1.0 Section 3.1 for details.
-pub trait ControlPointOperation {
+pub trait ControlPointOperation: Encodable<Error = PacketError> {
// Returns the expected opcode for this operation.
fn opcode() -> ControlPointOpcode;
@@ -220,14 +212,14 @@
/// See BASS spec v1.0 Section 3.1.1.4 for details.
#[derive(Debug, PartialEq)]
pub struct AddSourceOperation {
- advertiser_address_type: AddressType,
+ pub(crate) advertiser_address_type: AddressType,
// Address in little endian.
- advertiser_address: [u8; ADDRESS_BYTE_SIZE],
- advertising_sid: AdvertisingSetId,
- broadcast_id: BroadcastId,
- pa_sync: PaSync,
- pa_interval: PaInterval,
- subgroups: Vec<BigSubgroup>,
+ pub(crate) advertiser_address: [u8; ADDRESS_BYTE_SIZE],
+ pub(crate) advertising_sid: AdvertisingSetId,
+ pub(crate) broadcast_id: BroadcastId,
+ pub(crate) pa_sync: PaSync,
+ pub(crate) pa_interval: PaInterval,
+ pub(crate) subgroups: Vec<BigSubgroup>,
}
impl AddSourceOperation {
@@ -243,19 +235,19 @@
pub fn new(
address_type: AddressType,
advertiser_address: [u8; ADDRESS_BYTE_SIZE],
- sid: u8,
- broadcast_id: u32,
+ advertising_sid: AdvertisingSetId,
+ broadcast_id: BroadcastId,
pa_sync: PaSync,
- pa_interval: u16,
+ pa_interval: PaInterval,
subgroups: Vec<BigSubgroup>,
) -> Self {
AddSourceOperation {
advertiser_address_type: address_type,
advertiser_address,
- advertising_sid: AdvertisingSetId(sid),
- broadcast_id: BroadcastId(broadcast_id),
+ advertising_sid,
+ broadcast_id: broadcast_id,
pa_sync,
- pa_interval: PaInterval(pa_interval),
+ pa_interval,
subgroups,
}
}
@@ -323,7 +315,7 @@
buf[2..8].copy_from_slice(&self.advertiser_address);
buf[8] = self.advertising_sid.0;
self.broadcast_id.encode(&mut buf[9..12])?;
- buf[12] = self.pa_sync as u8;
+ buf[12] = u8::from(self.pa_sync);
buf[13..15].copy_from_slice(&self.pa_interval.0.to_le_bytes());
buf[15] = self
.subgroups
@@ -362,15 +354,10 @@
pub fn new(
source_id: SourceId,
pa_sync: PaSync,
- pa_interval: u16,
+ pa_interval: PaInterval,
subgroups: Vec<BigSubgroup>,
) -> Self {
- ModifySourceOperation {
- source_id,
- pa_sync,
- pa_interval: PaInterval(pa_interval),
- subgroups,
- }
+ ModifySourceOperation { source_id, pa_sync, pa_interval, subgroups }
}
}
@@ -418,7 +405,7 @@
buf[0] = Self::opcode() as u8;
buf[1] = self.source_id;
- buf[2] = self.pa_sync as u8;
+ buf[2] = u8::from(self.pa_sync);
buf[3..5].copy_from_slice(&self.pa_interval.0.to_le_bytes());
buf[5] = self
.subgroups
@@ -546,54 +533,89 @@
}
}
-#[repr(u8)]
-#[derive(Clone, Copy, Debug, PartialEq)]
-pub enum PaSync {
- DoNotSync = 0x00,
- SyncPastAvailable = 0x01,
- SyncPastUnavailable = 0x02,
-}
-
-impl TryFrom<u8> for PaSync {
- type Error = PacketError;
-
- fn try_from(value: u8) -> Result<Self, Self::Error> {
- match value {
- 0x00 => Ok(Self::DoNotSync),
- 0x01 => Ok(Self::SyncPastAvailable),
- 0x02 => Ok(Self::SyncPastUnavailable),
- _ => Err(PacketError::OutOfRange),
- }
+decodable_enum! {
+ pub enum PaSync<u8, bt_common::packet_encoding::Error, OutOfRange> {
+ DoNotSync = 0x00,
+ SyncPastAvailable = 0x01,
+ SyncPastUnavailable = 0x02,
}
}
-#[derive(Debug, PartialEq)]
+/// 4-octet bitfield. Bit 0-30 = BIS_index[1-31]
+/// 0x00000000: 0b0 = Do not synchronize to BIS_index[x]
+/// 0xxxxxxxxx: 0b1 = Synchronize to BIS_index[x]
+/// 0xFFFFFFFF: means No preference if used in BroadcastAudioScanControlPoint,
+/// Failed to sync if used in ReceiveState.
+#[derive(Clone, Debug, PartialEq)]
+pub struct BisSync(pub u32);
+
+impl BisSync {
+ const BYTE_SIZE: usize = 4;
+ const NO_PREFERENCE: u32 = 0xFFFFFFFF;
+
+ /// Updates whether or not a particular BIS should be set to synchronize or
+ /// not synchronize.
+ ///
+ /// # Arguments
+ ///
+ /// * `bis_index` - BIS index as defined in the spec. Range should be [1,
+ /// 31]
+ /// * `should_sync` - Whether or not to synchronize
+ fn set_sync_for_index(&mut self, bis_index: BisIndex) -> Result<(), PacketError> {
+ if bis_index < 1 || bis_index > 31 {
+ return Err(PacketError::InvalidParameter(format!("Invalid BIS index ({bis_index})")));
+ }
+ let bit_mask = 0b1 << (bis_index - 1);
+
+ // Clear the bit that we're interested in setting.
+ self.0 &= !(0b1 << bis_index - 1);
+ self.0 |= bit_mask;
+ Ok(())
+ }
+
+ /// Clears previous BIS_Sync params and synchronizes to specified BIS
+ /// indices. If the BIS index list is empty, no preference value is
+ /// used.
+ ///
+ /// # Arguments
+ ///
+ /// * `sync_map` - Map of BIS index to whether or not it should be
+ /// synchronized
+ pub fn set_sync(&mut self, bis_indices: &Vec<BisIndex>) -> Result<(), PacketError> {
+ if bis_indices.is_empty() {
+ self.0 = Self::NO_PREFERENCE;
+ return Ok(());
+ }
+ self.0 = 0;
+ for bis_index in bis_indices {
+ self.set_sync_for_index(*bis_index)?;
+ }
+ Ok(())
+ }
+}
+
+impl Default for BisSync {
+ fn default() -> Self {
+ Self(Self::NO_PREFERENCE)
+ }
+}
+
+#[derive(Clone, Debug, PartialEq)]
pub struct BigSubgroup {
- // 4-octet bitfield. Bit 0-30 = BIS_index[1-31]
- // 0x00000000: 0b0 = Do not synchronize to BIS_index[x]
- // 0xxxxxxxxx: 0b1 = Synchronize to BIS_index[x]
- // 0xFFFFFFFF: means No preference if used in BroadcastAudioScanControlPoint,
- // Failed to sync if used in ReceiveState.
- bis_sync_bitfield: u32,
- metadata: Vec<Metadata>,
+ pub(crate) bis_sync: BisSync,
+ pub(crate) metadata: Vec<Metadata>,
}
impl BigSubgroup {
- const BIS_SYNC_BYTE_SIZE: usize = 4;
const METADATA_LENGTH_BYTE_SIZE: usize = 1;
+ const MIN_PACKET_SIZE: usize = BisSync::BYTE_SIZE + Self::METADATA_LENGTH_BYTE_SIZE;
- const MIN_PACKET_SIZE: usize = Self::BIS_SYNC_BYTE_SIZE + Self::METADATA_LENGTH_BYTE_SIZE;
- const BIS_SYNC_NO_PREFERENCE: u32 = 0xFFFFFFFF;
-
- pub fn new(bis_sync_bitfield: Option<u32>) -> Self {
- Self {
- bis_sync_bitfield: bis_sync_bitfield.unwrap_or(Self::BIS_SYNC_NO_PREFERENCE),
- metadata: vec![],
- }
+ pub fn new(bis_sync: Option<BisSync>) -> Self {
+ Self { bis_sync: bis_sync.unwrap_or_default(), metadata: vec![] }
}
pub fn with_metadata(self, metadata: Vec<Metadata>) -> Self {
- Self { bis_sync_bitfield: self.bis_sync_bitfield, metadata }
+ Self { bis_sync: self.bis_sync, metadata }
}
}
@@ -620,7 +642,7 @@
}
// Ignore any undecodable metadata types
let metadata = results_metadata.into_iter().filter_map(Result::ok).collect();
- Ok((BigSubgroup { bis_sync_bitfield: bis_sync, metadata }, start_idx))
+ Ok((BigSubgroup { bis_sync: BisSync(bis_sync), metadata }, start_idx))
}
}
@@ -632,7 +654,7 @@
return Err(PacketError::BufferTooSmall);
}
- buf[0..4].copy_from_slice(&self.bis_sync_bitfield.to_le_bytes());
+ buf[0..4].copy_from_slice(&self.bis_sync.0.to_le_bytes());
let metadata_len = self
.metadata
.iter()
@@ -686,12 +708,35 @@
/// information about a Broadcast Source. If the server has not written a
/// Source_ID value to the Broadcast Receive State characteristic, the Broadcast
/// Recieve State characteristic value shall be empty.
-#[derive(Debug, PartialEq)]
+#[derive(Clone, Debug, PartialEq)]
pub enum BroadcastReceiveState {
Empty,
NonEmpty(ReceiveState),
}
+impl BroadcastReceiveState {
+ pub fn is_empty(&self) -> bool {
+ *self == BroadcastReceiveState::Empty
+ }
+
+ pub fn broadcast_id(&self) -> Option<BroadcastId> {
+ match self {
+ BroadcastReceiveState::Empty => None,
+ BroadcastReceiveState::NonEmpty(state) => Some(state.broadcast_id),
+ }
+ }
+
+ pub fn has_same_broadcast_id(&self, other: &BroadcastReceiveState) -> bool {
+ match self {
+ BroadcastReceiveState::Empty => false,
+ BroadcastReceiveState::NonEmpty(this) => match other {
+ BroadcastReceiveState::Empty => false,
+ BroadcastReceiveState::NonEmpty(that) => this.broadcast_id == that.broadcast_id,
+ },
+ }
+ }
+}
+
impl Decodable for BroadcastReceiveState {
type Error = PacketError;
@@ -722,18 +767,18 @@
}
}
-#[derive(Debug, PartialEq)]
+#[derive(Clone, Debug, PartialEq)]
pub struct ReceiveState {
- source_id: SourceId,
- source_address_type: AddressType,
+ pub(crate) source_id: SourceId,
+ pub(crate) source_address_type: AddressType,
// Address in little endian.
- source_address: [u8; ADDRESS_BYTE_SIZE],
- source_adv_sid: AdvertisingSetId,
- broadcast_id: BroadcastId,
- pa_sync_state: PaSyncState,
+ pub(crate) source_address: [u8; ADDRESS_BYTE_SIZE],
+ pub(crate) source_adv_sid: AdvertisingSetId,
+ pub(crate) broadcast_id: BroadcastId,
+ pub(crate) pa_sync_state: PaSyncState,
// Represents BIG_Encryption param with optional Bad_Code param.
- big_encryption: EncryptionStatus,
- subgroups: Vec<BigSubgroup>,
+ pub(crate) big_encryption: EncryptionStatus,
+ pub(crate) subgroups: Vec<BigSubgroup>,
}
impl ReceiveState {
@@ -848,7 +893,7 @@
buf[2..8].copy_from_slice(&self.source_address);
buf[8] = self.source_adv_sid.0;
self.broadcast_id.encode(&mut buf[9..12])?;
- buf[12] = self.pa_sync_state as u8;
+ buf[12] = u8::from(self.pa_sync_state);
let mut idx = 13 + self.big_encryption.encoded_len();
self.big_encryption.encode(&mut buf[13..idx])?;
buf[idx] = self
@@ -880,28 +925,13 @@
}
}
-#[derive(Clone, Copy, Debug, PartialEq)]
-#[repr(u8)]
-pub enum PaSyncState {
- NotSynced = 0x00,
- SyncInfoRequest = 0x01,
- Synced = 0x02,
- FailedToSync = 0x03,
- NoPast = 0x04,
-}
-
-impl TryFrom<u8> for PaSyncState {
- type Error = PacketError;
-
- fn try_from(value: u8) -> Result<Self, Self::Error> {
- match value {
- 0x00 => Ok(Self::NotSynced),
- 0x01 => Ok(Self::SyncInfoRequest),
- 0x02 => Ok(Self::Synced),
- 0x03 => Ok(Self::FailedToSync),
- 0x04 => Ok(Self::NoPast),
- _ => Err(PacketError::OutOfRange),
- }
+decodable_enum! {
+ pub enum PaSyncState<u8, bt_common::packet_encoding::Error, OutOfRange> {
+ NotSynced = 0x00,
+ SyncInfoRequest = 0x01,
+ Synced = 0x02,
+ FailedToSync = 0x03,
+ NoPast = 0x04,
}
}
@@ -1082,6 +1112,27 @@
}
#[test]
+ fn bis_sync() {
+ let mut bis_sync = BisSync::default();
+ assert_eq!(bis_sync, BisSync(BisSync::NO_PREFERENCE));
+
+ bis_sync.set_sync(&vec![1, 6, 31]).expect("should succeed");
+ assert_eq!(bis_sync, BisSync(0x40000021));
+
+ bis_sync.set_sync(&vec![]).expect("should succeed");
+ assert_eq!(bis_sync, BisSync::default());
+ }
+
+ #[test]
+ fn invalid_bis_sync() {
+ let mut bis_sync = BisSync::default();
+
+ bis_sync.set_sync(&vec![0]).expect_err("should fail");
+
+ bis_sync.set_sync(&vec![32]).expect_err("should fail");
+ }
+
+ #[test]
fn remote_scan_stopped() {
// Encoding remote scan stopped.
let stopped = RemoteScanStoppedOperation;
@@ -1121,10 +1172,10 @@
let op = AddSourceOperation::new(
AddressType::Public,
[0x04, 0x10, 0x00, 0x00, 0x00, 0x00],
- 1,
- 0x11,
+ AdvertisingSetId(1),
+ BroadcastId(0x11),
PaSync::DoNotSync,
- PaInterval::UNKNOWN_VALUE,
+ PaInterval::unknown(),
vec![],
);
assert_eq!(op.encoded_len(), 16);
@@ -1153,10 +1204,10 @@
let op = AddSourceOperation::new(
AddressType::Random,
[0x04, 0x10, 0x00, 0x00, 0x00, 0x00],
- 1,
- 0x11,
+ AdvertisingSetId(1),
+ BroadcastId(0x11),
PaSync::SyncPastAvailable,
- PaInterval::UNKNOWN_VALUE,
+ PaInterval::unknown(),
subgroups,
);
assert_eq!(op.encoded_len(), 31); // 16 for minimum params and params 15 for the subgroup.
@@ -1166,8 +1217,8 @@
let bytes = vec![
0x02, 0x01, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00, 0x01, 0x11, 0x00, 0x00, 0x01, 0xFF,
0xFF, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x0A, // BIS_Sync, Metdata_Length
- 0x03, 0x01, 0x0C, 0x00, // Preferred_Audio_Contexts metdataum
- 0x05, 0x03, 0x74, 0x65, 0x73, 0x074, // Program_Info metadatum
+ 0x03, 0x01, 0x0C, 0x00, // Preferred_Audio_Contexts metadata
+ 0x05, 0x03, 0x74, 0x65, 0x73, 0x074, // Program_Info metadata
];
assert_eq!(buf, bytes);
@@ -1180,7 +1231,8 @@
#[test]
fn modify_source_without_subgroups() {
// Encoding operation with no subgroups.
- let op = ModifySourceOperation::new(0x0A, PaSync::SyncPastAvailable, 0x1004, vec![]);
+ let op =
+ ModifySourceOperation::new(0x0A, PaSync::SyncPastAvailable, PaInterval(0x1004), vec![]);
assert_eq!(op.encoded_len(), 6);
let mut buf = vec![0u8; op.encoded_len()];
op.encode(&mut buf[..]).expect("shoud succeed");
@@ -1199,15 +1251,11 @@
// Encoding operation with subgroups.
let subgroups = vec![
BigSubgroup::new(None).with_metadata(vec![Metadata::ParentalRating(Rating::all_age())]), /* encoded_len = 8 */
- BigSubgroup::new(Some(0x000000FE))
+ BigSubgroup::new(Some(BisSync(0x000000FE)))
.with_metadata(vec![Metadata::BroadcastAudioImmediateRenderingFlag]), /* encoded_len = 7 */
];
- let op = ModifySourceOperation::new(
- 0x0B,
- PaSync::DoNotSync,
- PaInterval::UNKNOWN_VALUE,
- subgroups,
- );
+ let op =
+ ModifySourceOperation::new(0x0B, PaSync::DoNotSync, PaInterval::unknown(), subgroups);
assert_eq!(op.encoded_len(), 21); // 6 for minimum params and params 15 for two subgroups.
let mut buf = vec![0u8; op.encoded_len()];
op.encode(&mut buf[..]).expect("shoud succeed");
diff --git a/rust/bt-common/src/core.rs b/rust/bt-common/src/core.rs
index c7efbca..fd02299 100644
--- a/rust/bt-common/src/core.rs
+++ b/rust/bt-common/src/core.rs
@@ -116,8 +116,44 @@
}
}
+/// Codec_ID communicated by a basic audio profile service/role.
+#[derive(Debug, Clone, PartialEq)]
+pub enum CodecId {
+ /// From the Assigned Numbers. Format will not be
+ /// `CodingFormat::VendorSpecific`
+ Assigned(CodingFormat),
+ VendorSpecific {
+ company_id: crate::CompanyId,
+ vendor_specific_codec_id: u16,
+ },
+}
+
+impl CodecId {
+ pub const BYTE_SIZE: usize = 5;
+}
+
+impl crate::packet_encoding::Decodable for CodecId {
+ type Error = crate::packet_encoding::Error;
+
+ fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+ if buf.len() < 5 {
+ return Err(crate::packet_encoding::Error::UnexpectedDataLength);
+ }
+ let format = buf[0].into();
+ if format != CodingFormat::VendorSpecific {
+ // Maybe don't ignore the company and vendor id, and check if they are wrong.
+ return Ok((Self::Assigned(format), 5));
+ }
+ let company_id = u16::from_le_bytes([buf[1], buf[2]]).into();
+ let vendor_specific_codec_id = u16::from_le_bytes([buf[3], buf[4]]);
+ Ok((Self::VendorSpecific { company_id, vendor_specific_codec_id }, 5))
+ }
+}
+
#[cfg(test)]
mod tests {
+ use crate::packet_encoding::Decodable;
+
use super::*;
#[test]
@@ -136,4 +172,21 @@
interval.encode(&mut buf[..]).expect_err("should fail");
}
+
+ #[test]
+ fn decode_codec_id() {
+ let assigned = [0x01, 0x00, 0x00, 0x00, 0x00];
+ let (codec_id, _) = CodecId::decode(&assigned[..]).expect("should succeed");
+ assert_eq!(codec_id, CodecId::Assigned(CodingFormat::ALawLog));
+
+ let vendor_specific = [0xFF, 0x36, 0xFD, 0x11, 0x22];
+ let (codec_id, _) = CodecId::decode(&vendor_specific[..]).expect("should succeed");
+ assert_eq!(
+ codec_id,
+ CodecId::VendorSpecific {
+ company_id: (0xFD36 as u16).into(),
+ vendor_specific_codec_id: 0x2211
+ }
+ );
+ }
}
diff --git a/rust/bt-common/src/generic_audio.rs b/rust/bt-common/src/generic_audio.rs
index cfbcbd3..c8a8530 100644
--- a/rust/bt-common/src/generic_audio.rs
+++ b/rust/bt-common/src/generic_audio.rs
@@ -3,7 +3,7 @@
// found in the LICENSE file.
pub mod codec_capabilities;
-
+pub mod codec_configuration;
pub mod metadata_ltv;
use crate::{codable_as_bitmask, decodable_enum};
diff --git a/rust/bt-common/src/generic_audio/codec_configuration.rs b/rust/bt-common/src/generic_audio/codec_configuration.rs
new file mode 100644
index 0000000..41f5c07
--- /dev/null
+++ b/rust/bt-common/src/generic_audio/codec_configuration.rs
@@ -0,0 +1,165 @@
+// Copyright 2024 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::collections::HashSet;
+use std::ops::RangeBounds;
+
+use crate::core::ltv::LtValue;
+use crate::decodable_enum;
+
+use super::AudioLocation;
+
+#[derive(Copy, Clone, Hash, PartialEq, Eq, Debug)]
+pub enum CodecConfigurationType {
+ SamplingFrequency,
+ FrameDuration,
+ AudioChannelAllocation,
+ OctetsPerCodecFrame,
+ CodecFramesPerSdu,
+}
+
+impl From<CodecConfigurationType> for u8 {
+ fn from(value: CodecConfigurationType) -> Self {
+ match value {
+ CodecConfigurationType::SamplingFrequency => 1,
+ CodecConfigurationType::FrameDuration => 2,
+ CodecConfigurationType::AudioChannelAllocation => 3,
+ CodecConfigurationType::OctetsPerCodecFrame => 4,
+ CodecConfigurationType::CodecFramesPerSdu => 5,
+ }
+ }
+}
+
+decodable_enum! {
+ pub enum FrameDuration<u8, crate::packet_encoding::Error, OutOfRange> {
+ SevenFiveMs = 0x00,
+ TenMs = 0x01,
+ }
+}
+
+decodable_enum! {
+ pub enum SamplingFrequency<u8, crate::packet_encoding::Error, OutOfRange> {
+ F8000Hz = 0x01,
+ F11025Hz = 0x02,
+ F16000Hz = 0x03,
+ F22050Hz = 0x04,
+ F24000Hz = 0x05,
+ F32000Hz = 0x06,
+ F44100Hz = 0x07,
+ F48000Hz = 0x08,
+ F88200Hz = 0x09,
+ F96000Hz = 0x0A,
+ F176400Hz = 0x0B,
+ F192000Hz = 0x0C,
+ F384000Hz = 0x0D,
+ }
+}
+
+/// Codec Configuration LTV Structures
+///
+/// Defined in Assigned Numbers Section 6.12.5.
+#[derive(Clone, PartialEq, Eq, Debug)]
+pub enum CodecConfiguration {
+ SamplingFrequency(SamplingFrequency),
+ FrameDuration(FrameDuration),
+ AudioChannelAllocation(std::collections::HashSet<AudioLocation>),
+ OctetsPerCodecFrame(u16),
+ CodecFramesPerSdu(u8),
+}
+
+impl LtValue for CodecConfiguration {
+ type Type = CodecConfigurationType;
+
+ const NAME: &'static str = "Codec Compatability";
+
+ fn type_from_octet(x: u8) -> Option<Self::Type> {
+ match x {
+ 1 => Some(CodecConfigurationType::SamplingFrequency),
+ 2 => Some(CodecConfigurationType::FrameDuration),
+ 3 => Some(CodecConfigurationType::AudioChannelAllocation),
+ 4 => Some(CodecConfigurationType::OctetsPerCodecFrame),
+ 5 => Some(CodecConfigurationType::CodecFramesPerSdu),
+ _ => None,
+ }
+ }
+
+ fn length_range_from_type(ty: Self::Type) -> std::ops::RangeInclusive<u8> {
+ match ty {
+ CodecConfigurationType::SamplingFrequency => 2..=2,
+ CodecConfigurationType::FrameDuration => 2..=2,
+ CodecConfigurationType::AudioChannelAllocation => 5..=5,
+ CodecConfigurationType::OctetsPerCodecFrame => 3..=3,
+ CodecConfigurationType::CodecFramesPerSdu => 2..=2,
+ }
+ }
+
+ fn into_type(&self) -> Self::Type {
+ match self {
+ CodecConfiguration::SamplingFrequency(_) => CodecConfigurationType::SamplingFrequency,
+ CodecConfiguration::FrameDuration(_) => CodecConfigurationType::FrameDuration,
+ CodecConfiguration::AudioChannelAllocation(_) => {
+ CodecConfigurationType::AudioChannelAllocation
+ }
+ CodecConfiguration::OctetsPerCodecFrame(_) => {
+ CodecConfigurationType::OctetsPerCodecFrame
+ }
+ CodecConfiguration::CodecFramesPerSdu(_) => CodecConfigurationType::CodecFramesPerSdu,
+ }
+ }
+
+ fn value_encoded_len(&self) -> u8 {
+ // All the CodecCapabilities are constant length. Remove the type octet.
+ let range = Self::length_range_from_type(self.into_type());
+ let std::ops::Bound::Included(len) = range.start_bound() else {
+ unreachable!();
+ };
+ (len - 1).into()
+ }
+
+ fn encode_value(&self, buf: &mut [u8]) -> Result<(), crate::packet_encoding::Error> {
+ match self {
+ CodecConfiguration::SamplingFrequency(freq) => buf[0] = u8::from(*freq),
+ CodecConfiguration::FrameDuration(fd) => buf[0] = u8::from(*fd),
+ CodecConfiguration::AudioChannelAllocation(audio_locations) => {
+ let locations: Vec<&AudioLocation> = audio_locations.into_iter().collect();
+ buf[0..4]
+ .copy_from_slice(&AudioLocation::to_bits(locations.into_iter()).to_le_bytes());
+ }
+ CodecConfiguration::OctetsPerCodecFrame(octets) => {
+ buf[0..2].copy_from_slice(&octets.to_le_bytes())
+ }
+ CodecConfiguration::CodecFramesPerSdu(frames) => buf[0] = *frames,
+ };
+ Ok(())
+ }
+
+ fn decode_value(
+ ty: &CodecConfigurationType,
+ buf: &[u8],
+ ) -> Result<Self, crate::packet_encoding::Error> {
+ match ty {
+ CodecConfigurationType::SamplingFrequency => {
+ let freq = SamplingFrequency::try_from(buf[0])?;
+ Ok(Self::SamplingFrequency(freq))
+ }
+ CodecConfigurationType::FrameDuration => {
+ let fd = FrameDuration::try_from(buf[0])?;
+ Ok(Self::FrameDuration(fd))
+ }
+ CodecConfigurationType::AudioChannelAllocation => {
+ let raw_value = u32::from_le_bytes(buf[0..4].try_into().unwrap());
+ let locations = AudioLocation::from_bits(raw_value).collect::<HashSet<_>>();
+ Ok(Self::AudioChannelAllocation(locations))
+ }
+ CodecConfigurationType::OctetsPerCodecFrame => {
+ let value = u16::from_le_bytes(buf[0..2].try_into().unwrap());
+ Ok(Self::OctetsPerCodecFrame(value))
+ }
+ CodecConfigurationType::CodecFramesPerSdu => Ok(Self::CodecFramesPerSdu(buf[0])),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {}
diff --git a/rust/bt-gatt/Cargo.toml b/rust/bt-gatt/Cargo.toml
index 24d4084..6cab048 100644
--- a/rust/bt-gatt/Cargo.toml
+++ b/rust/bt-gatt/Cargo.toml
@@ -16,4 +16,4 @@
[dev-dependencies]
assert_matches.workspace = true
-bt-gatt = { path = ".", features = ["test-utils"] }
+bt-gatt = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index 4908e98..879a0fe 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -2,21 +2,19 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-use crate::central::ScanResult;
-use crate::client::{self, CharacteristicNotification};
-use crate::{types::*, GattTypes};
-
-use bt_common::{PeerId, Uuid};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::future::{ready, Ready};
-use futures::{Future, Stream};
+use futures::Stream;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
use std::task::Poll;
-#[derive(Default)]
-pub struct FakeCentral {}
+use bt_common::{PeerId, Uuid};
+
+use crate::central::{AdvertisingDatum, PeerName, ScanResult};
+use crate::client::CharacteristicNotification;
+use crate::{types::*, GattTypes};
#[derive(Default)]
struct FakePeerServiceInner {
@@ -98,8 +96,8 @@
ready(Ok((value.len(), false)))
}
- /// For testing, should call `expect_characteristic_value` with the expected
- /// value.
+ // For testing, should call `expect_characteristic_value` with the expected
+ // value.
fn write_characteristic<'a>(
&self,
handle: &Handle,
@@ -114,7 +112,7 @@
};
// Value written was not expected.
if buf.len() != expected.len() || &buf[..expected.len()] != expected.as_slice() {
- panic!("Value written to characteristic {handle:?} was not expected");
+ panic!("Value written to characteristic {handle:?} was not expected: {buf:?}");
}
ready(Ok(()))
}
@@ -177,7 +175,7 @@
}
impl Stream for SingleResultStream {
- type Item = Result<crate::central::ScanResult>;
+ type Item = Result<ScanResult>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
@@ -192,19 +190,6 @@
}
}
-pub(crate) struct ClientConnectFut {}
-
-impl Future for ClientConnectFut {
- type Output = Result<FakeClient>;
-
- fn poll(
- self: std::pin::Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
- ) -> Poll<Self::Output> {
- todo!()
- }
-}
-
pub struct FakeTypes {}
impl GattTypes for FakeTypes {
@@ -217,21 +202,22 @@
type PeerService = FakePeerService;
type ServiceConnectFut = Ready<Result<FakePeerService>>;
type CharacteristicDiscoveryFut = Ready<Result<Vec<Characteristic>>>;
- type NotificationStream = UnboundedReceiver<Result<client::CharacteristicNotification>>;
+ type NotificationStream = UnboundedReceiver<Result<CharacteristicNotification>>;
type ReadFut<'a> = Ready<Result<(usize, bool)>>;
type WriteFut<'a> = Ready<Result<()>>;
}
+#[derive(Default)]
+pub struct FakeCentral {}
+
impl crate::Central<FakeTypes> for FakeCentral {
fn scan(&self, _filters: &[crate::central::ScanFilter]) -> SingleResultStream {
SingleResultStream {
result: Some(Ok(ScanResult {
id: PeerId(1),
connectable: true,
- name: crate::central::PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
- advertised: vec![crate::central::AdvertisingDatum::Services(vec![Uuid::from_u16(
- 0x1844,
- )])],
+ name: PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
+ advertised: vec![AdvertisingDatum::Services(vec![Uuid::from_u16(0x1844)])],
})),
}
}
diff --git a/rust/bt-pacs/src/lib.rs b/rust/bt-pacs/src/lib.rs
index 7a62d42..38efa40 100644
--- a/rust/bt-pacs/src/lib.rs
+++ b/rust/bt-pacs/src/lib.rs
@@ -3,6 +3,7 @@
// found in the LICENSE file.
use bt_common::core::ltv::LtValue;
+use bt_common::core::CodecId;
use bt_common::generic_audio::codec_capabilities::CodecCapability;
use bt_common::generic_audio::metadata_ltv::Metadata;
use bt_common::generic_audio::{AudioLocation, ContextType};
@@ -10,38 +11,6 @@
use bt_common::Uuid;
use bt_gatt::{client::FromCharacteristic, Characteristic};
-/// Codec_ID communicated by the Published Audio Capabilities Service
-/// From Section 3.1 of the Spec.
-/// Used in [`PacRecord`]
-#[derive(Debug, Clone, PartialEq)]
-pub enum CodecId {
- /// From the Assigned Numbers. Format will not be
- /// `CodingFormat::VendorSpecific`
- Assigned(bt_common::core::CodingFormat),
- VendorSpecific {
- company_id: bt_common::CompanyId,
- vendor_specific_codec_id: u16,
- },
-}
-
-impl bt_common::packet_encoding::Decodable for CodecId {
- type Error = bt_common::packet_encoding::Error;
-
- fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
- if buf.len() < 5 {
- return Err(bt_common::packet_encoding::Error::UnexpectedDataLength);
- }
- let format = buf[0].into();
- if format != bt_common::core::CodingFormat::VendorSpecific {
- // Maybe don't ignore the company and vendor id, and check if they are wrong.
- return Ok((Self::Assigned(format), 5));
- }
- let company_id = u16::from_le_bytes([buf[1], buf[2]]).into();
- let vendor_specific_codec_id = u16::from_le_bytes([buf[3], buf[4]]);
- Ok((Self::VendorSpecific { company_id, vendor_specific_codec_id }, 5))
- }
-}
-
/// A Published Audio Capability (PAC) record.
/// Published Audio Capabilities represent the capabilities of a given peer to
/// transmit or receive Audio capabilities, exposed in PAC records, represent