[rust][le][bass] BASS client can send events based on GATT notifications
When BASS client writes Broadcast Audio Scan Control Point
characteristic to remote BASS server using GATT write, BASS server
notifies the client of its internal changes by sending GATT notification
of Broadcast Receive State characteristic value changes. Based on the
value change, we should notify the upper layer of the sync state and
encryption status of the broadcast source.
Bug: b/308483171
Change-Id: I13f8712e7e92744879069761a97a28698f760a1e
Reviewed-on: https://bluetooth-review.git.corp.google.com/c/bluetooth/+/1220
Reviewed-by: Marie Janssen <jamuraa@google.com>
diff --git a/rust/bt-bass/Cargo.toml b/rust/bt-bass/Cargo.toml
index b4af24c..74f3ba8 100644
--- a/rust/bt-bass/Cargo.toml
+++ b/rust/bt-bass/Cargo.toml
@@ -8,5 +8,6 @@
bt-common = { path = "../bt-common" }
bt-gatt = { path = "../bt-gatt", features = ["test-utils"] }
futures = "0.3.28"
+parking_lot = "0.12.1"
thiserror = "1.0"
tracing = "0.1.40"
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
index f9ba7fd..398b24e 100644
--- a/rust/bt-bass/src/client.rs
+++ b/rust/bt-bass/src/client.rs
@@ -3,77 +3,126 @@
// found in the LICENSE file.
pub mod error;
+pub mod event;
use std::collections::HashMap;
-use std::sync::{Arc, RwLock};
-use std::time::SystemTime;
+use std::sync::Arc;
+use futures::stream::{BoxStream, SelectAll, StreamExt};
+use parking_lot::Mutex;
use tracing::warn;
-use bt_common::packet_encoding::Decodable;
-use bt_gatt::client::{PeerService, ServiceCharacteristic};
-use bt_gatt::types::Handle;
+use bt_common::packet_encoding::{Decodable, Encodable};
+use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic};
+use bt_gatt::types::{Handle, WriteMode};
use crate::client::error::{Error, ServiceError};
+use crate::client::event::*;
use crate::types::*;
const READ_CHARACTERISTIC_BUFFER_SIZE: usize = 255;
-#[allow(dead_code)]
-type ReceiveStateReadValue = (BroadcastReceiveState, SystemTime);
+/// Keeps track of Source_ID and Broadcast_ID that are associated together.
+/// Source_ID is assigned by the BASS server to a Broadcast Receive State characteristic.
+/// If the remote peer with the BASS server autonomously synchronized to a PA
+/// or accepted the Add Source operation, the server selects an empty Broadcast
+/// Receive State characteristic to update or deletes one of the existing one to update.
+/// However, because the concept of Source_ID is unqiue to BASS, we track the Broadcast_ID
+/// that a Source_ID is associated so that it can be used by upper layers.
+#[derive(Default)]
+pub struct BroadcastSourceIdTracker {
+ source_to_broadcast: HashMap<SourceId, BroadcastId>,
+}
+
+impl BroadcastSourceIdTracker {
+ fn new() -> Self {
+ Self::default()
+ }
+
+ /// Updates the broadcast ID associated with the given source ID and returns the
+ /// previously-associated broadcast ID if it exists.
+ fn update(&mut self, source_id: SourceId, broadcast_id: BroadcastId) -> Option<BroadcastId> {
+ self.source_to_broadcast.insert(source_id, broadcast_id)
+ }
+
+ fn source_id(&self, broadcast_id: &BroadcastId) -> Option<SourceId> {
+ self.source_to_broadcast
+ .iter()
+ .find_map(|(sid, bid)| (*bid == *broadcast_id).then_some(*sid))
+ }
+}
/// Manages connection to the Broadcast Audio Scan Service at the
/// remote Scan Delegator and writes/reads characteristics to/from it.
-#[allow(dead_code)]
pub struct BroadcastAudioScanServiceClient<T: bt_gatt::GattTypes> {
- gatt_client: T::PeerService,
- /// Broadcast Audio Scan Service only has one Broadcast Audio Scan Control
- /// Point characteristic according to BASS Section 3. There shall
+ gatt_client: Box<T::PeerService>,
+ id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
+ /// Broadcast Audio Scan Service only has one Broadcast Audio Scan Control Point characteristic
+ /// according to BASS Section 3. There shall
/// be one or more Broadcast Receive State characteristics.
audio_scan_control_point: Handle,
- /// Broadcast Receive State characteristics can be used to determine the
- /// BASS status.
- receive_states: Arc<RwLock<HashMap<Handle, Option<ReceiveStateReadValue>>>>,
- // List of characteristic notification streams we are listening to.
- notification_streams: Vec<T::NotificationStream>,
+ /// Broadcast Receive State characteristics can be used to determine the BASS status.
+ receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+ /// Keeps track of the broadcast codes that were sent to the remote BASS server.
+ broadcast_codes: HashMap<SourceId, [u8; 16]>,
+ // GATT notification streams for BRS characteristic value changes.
+ notification_streams: Option<
+ SelectAll<BoxStream<'static, Result<CharacteristicNotification, bt_gatt::types::Error>>>,
+ >,
}
-#[allow(dead_code)]
impl<T: bt_gatt::GattTypes> BroadcastAudioScanServiceClient<T> {
- async fn create(gatt_client: T::PeerService) -> Result<Self, Error> {
- let characteristics = Self::discover_all_characteristics(&gatt_client).await?;
+ async fn create(gatt_client: T::PeerService) -> Result<Self, Error>
+ where
+ <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
+ {
+ // BASS server should have a single Broadcast Audio Scan Control Point Characteristic.
+ let bascp =
+ ServiceCharacteristic::<T>::find(&gatt_client, BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID)
+ .await
+ .map_err(|e| Error::Gatt(e))?;
+ if bascp.len() != 1 {
+ let err = if bascp.len() == 0 {
+ Error::Service(ServiceError::MissingCharacteristic)
+ } else {
+ Error::Service(ServiceError::ExtraScanControlPointCharacteristic)
+ };
+ return Err(err);
+ }
+ let bascp_handle = *bascp[0].handle();
+ let brs_chars = Self::discover_brs_characteristics(&gatt_client).await?;
+ let mut id_tracker = BroadcastSourceIdTracker::new();
+ for c in brs_chars.values() {
+ if let Some(read_value) = c {
+ if let BroadcastReceiveState::NonEmpty(state) = read_value {
+ let _ = id_tracker.update(state.source_id(), state.broadcast_id());
+ }
+ }
+ }
+
let mut client = Self {
- gatt_client,
- audio_scan_control_point: characteristics.0,
- receive_states: Arc::new(RwLock::new(characteristics.1)),
- notification_streams: Vec::new(),
+ gatt_client: Box::new(gatt_client),
+ id_tracker: Arc::new(Mutex::new(id_tracker)),
+ audio_scan_control_point: bascp_handle,
+ receive_states: Arc::new(Mutex::new(brs_chars)),
+ broadcast_codes: HashMap::new(),
+ notification_streams: None,
};
- client.register_notification().await?;
+ client.register_notifications();
Ok(client)
}
- // Discover all the characteristics that are required.
- // On success, returns the tuple of (Broadcast Audio Scan Control Point
- // Characteristic, HashMap of all Broadcast Received State Characteristics).
- async fn discover_all_characteristics(
+ // Discover all the Broadcast Receive State characteristics.
+ // On success, returns the HashMap of all Broadcast Received State Characteristics.
+ async fn discover_brs_characteristics(
gatt_client: &T::PeerService,
- ) -> Result<(Handle, HashMap<Handle, Option<ReceiveStateReadValue>>), Error> {
- let bascp =
- ServiceCharacteristic::<T>::find(gatt_client, BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID)
- .await
- .map_err(|e| Error::Gatt(e))?;
+ ) -> Result<HashMap<Handle, Option<BroadcastReceiveState>>, Error> {
let brs = ServiceCharacteristic::<T>::find(gatt_client, BROADCAST_RECEIVE_STATE_UUID)
.await
.map_err(|e| Error::Gatt(e))?;
- if bascp.len() == 0 || brs.len() == 0 {
+ if brs.len() == 0 {
return Err(Error::Service(ServiceError::MissingCharacteristic));
}
- if bascp.len() > 1 {
- return Err(Error::Service(ServiceError::ExtraScanControlPointCharacteristic));
- }
- // Discover all characteristics at the BASS at the remote peer over the GATT
- // connection. Service should have a single Broadcast Audio Scan Control Point
- // Characteristic
let mut brs_map = HashMap::new();
for c in brs {
// Read the value of the Broadcast Recieve State at the time of discovery for
@@ -81,8 +130,8 @@
let mut buf = vec![0; READ_CHARACTERISTIC_BUFFER_SIZE];
match c.read(&mut buf[..]).await {
Ok(read_bytes) => match BroadcastReceiveState::decode(&buf[0..read_bytes]) {
- Ok(decoded) => {
- brs_map.insert(*c.handle(), Some((decoded.0, SystemTime::now())));
+ Ok((decoded, _decoded_bytes)) => {
+ brs_map.insert(*c.handle(), Some(decoded));
continue;
}
Err(e) => warn!(
@@ -95,17 +144,65 @@
}
brs_map.insert(*c.handle(), None);
}
- Ok((*bascp[0].handle(), brs_map))
+ Ok(brs_map)
}
- /// Registers for notifications for Broadcast Receive State characteristics.
- async fn register_notification(&mut self) -> Result<(), Error> {
- // Notification is mandatory for Receive State as per BASS v1.0 Section 3.2.1.
- let lock = self.receive_states.write().unwrap();
- for handle in lock.keys() {
- let stream = self.gatt_client.subscribe(handle);
- self.notification_streams.push(stream);
+ fn register_notifications(&mut self)
+ where
+ <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
+ {
+ let mut notification_streams = SelectAll::new();
+ {
+ let lock = self.receive_states.lock();
+ for handle in lock.keys() {
+ let stream = self.gatt_client.subscribe(&handle);
+ notification_streams.push(stream.boxed());
+ }
}
+ self.notification_streams = Some(notification_streams);
+ }
+
+ /// Returns a stream that can be used by the upper layer to poll for BroadcastAudioScanServiceEvent.
+ /// BroadcastAudioScanServiceEvents are generated based on BRS characteristic change received from GATT notification that
+ /// are processed by BroadcastAudioScanServiceClient.
+ /// This method should only be called once.
+ /// Returns an error if the method is called for a second time.
+ pub fn take_event_stream(&mut self) -> Option<BroadcastAudioScanServiceEventStream> {
+ let notification_streams = self.notification_streams.take();
+ let Some(streams) = notification_streams else {
+ return None;
+ };
+ let event_stream = BroadcastAudioScanServiceEventStream::new(
+ streams,
+ self.id_tracker.clone(),
+ self.receive_states.clone(),
+ );
+ Some(event_stream)
+ }
+
+ /// Sets the broadcast code for a particular broadcast stream.
+ pub async fn set_broadcast_code(
+ &mut self,
+ broadcast_id: BroadcastId,
+ broadcast_code: [u8; 16],
+ ) -> Result<(), Error> {
+ let source_id = self.id_tracker.lock().source_id(&broadcast_id).ok_or(Error::Generic(
+ format!("Cannot find Source ID for specified Broadcast ID {broadcast_id:?}"),
+ ))?;
+
+ let op = SetBroadcastCodeOperation::new(source_id, broadcast_code.clone());
+ let mut buf = vec![0; op.encoded_len()];
+ let _ = op.encode(&mut buf[..]).map_err(|e| Error::Packet(e))?;
+
+ let c = &self.audio_scan_control_point;
+ let _ = self
+ .gatt_client
+ .write_characteristic(c, WriteMode::WithoutResponse, 0, buf.as_slice())
+ .await
+ .map_err(|e| Error::Gatt(e))?;
+
+ // Save the broadcast code we sent.
+ self.broadcast_codes.insert(source_id, broadcast_code);
Ok(())
}
}
@@ -116,6 +213,7 @@
use std::task::Poll;
+ use futures::executor::block_on;
use futures::{pin_mut, FutureExt};
use bt_common::Uuid;
@@ -125,14 +223,19 @@
};
use bt_gatt::Characteristic;
- #[test]
- fn create_client() {
+ const RECEIVE_STATE_1_HANDLE: Handle = Handle(1);
+ const RECEIVE_STATE_2_HANDLE: Handle = Handle(2);
+ const RECEIVE_STATE_3_HANDLE: Handle = Handle(3);
+ const RANDOME_CHAR_HANDLE: Handle = Handle(4);
+ const AUDIO_SCAN_CONTROL_POINT_HANDLE: Handle = Handle(5);
+
+ fn setup_client() -> (BroadcastAudioScanServiceClient<FakeTypes>, FakePeerService) {
let mut fake_peer_service = FakePeerService::new();
- // Add 3 Broadcast Receive State Characteristics, 1 Broadcast Audio Scan Control
- // Point Characteristic, and 1 random one.
+ // Add 3 Broadcast Receive State Characteristics, 1 Broadcast Audio Scan Control Point Characteristic,
+ // and 1 random one.
fake_peer_service.add_characteristic(
Characteristic {
- handle: Handle(1),
+ handle: RECEIVE_STATE_1_HANDLE,
uuid: BROADCAST_RECEIVE_STATE_UUID,
properties: CharacteristicProperties(vec![
CharacteristicProperty::Broadcast,
@@ -145,7 +248,7 @@
);
fake_peer_service.add_characteristic(
Characteristic {
- handle: Handle(2),
+ handle: RECEIVE_STATE_2_HANDLE,
uuid: BROADCAST_RECEIVE_STATE_UUID,
properties: CharacteristicProperties(vec![
CharacteristicProperty::Broadcast,
@@ -158,7 +261,7 @@
);
fake_peer_service.add_characteristic(
Characteristic {
- handle: Handle(3),
+ handle: RECEIVE_STATE_3_HANDLE,
uuid: BROADCAST_RECEIVE_STATE_UUID,
properties: CharacteristicProperties(vec![
CharacteristicProperty::Broadcast,
@@ -171,7 +274,7 @@
);
fake_peer_service.add_characteristic(
Characteristic {
- handle: Handle(4),
+ handle: RANDOME_CHAR_HANDLE,
uuid: Uuid::from_u16(0x1234),
properties: CharacteristicProperties(vec![CharacteristicProperty::Notify]),
permissions: AttributePermissions::default(),
@@ -181,7 +284,7 @@
);
fake_peer_service.add_characteristic(
Characteristic {
- handle: Handle(5),
+ handle: AUDIO_SCAN_CONTROL_POINT_HANDLE,
uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
permissions: AttributePermissions::default(),
@@ -196,19 +299,22 @@
pin_mut!(create_result);
let polled = create_result.poll_unpin(&mut noop_cx);
let Poll::Ready(Ok(client)) = polled else {
- panic!("Expected BASSClient to be succesfully created");
+ panic!("Expected BroadcastAudioScanServiceClient to be succesfully created");
};
- // Check that all the characteristics have been discovered.
- assert_eq!(client.audio_scan_control_point, Handle(5));
- let broadcast_receive_states = client.receive_states.read().unwrap();
- assert!(broadcast_receive_states.contains_key(&Handle(1)));
- assert!(broadcast_receive_states.contains_key(&Handle(2)));
- assert!(broadcast_receive_states.contains_key(&Handle(3)));
+ (client, fake_peer_service)
+ }
- // Three notification one for each of broadcast receive state characteristic
- // should have been set up.
- assert_eq!(client.notification_streams.len(), 3);
+ #[test]
+ fn create_client() {
+ let (client, _) = setup_client();
+
+ // Check that all the characteristics have been discovered.
+ assert_eq!(client.audio_scan_control_point, AUDIO_SCAN_CONTROL_POINT_HANDLE);
+ let broadcast_receive_states = client.receive_states.lock();
+ assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_1_HANDLE));
+ assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_2_HANDLE));
+ assert!(broadcast_receive_states.contains_key(&RECEIVE_STATE_3_HANDLE));
}
#[test]
@@ -235,7 +341,7 @@
pin_mut!(create_result);
let polled = create_result.poll_unpin(&mut noop_cx);
let Poll::Ready(Err(_)) = polled else {
- panic!("Expected BASSClient to have failed");
+ panic!("Expected BroadcastAudioScanServiceClient to have failed");
};
// Missing receive state characteristic.
@@ -257,7 +363,7 @@
pin_mut!(create_result);
let polled = create_result.poll_unpin(&mut noop_cx);
let Poll::Ready(Err(_)) = polled else {
- panic!("Expected BASSClient to have failed");
+ panic!("Expected BroadcastAudioScanServiceClient to have failed");
};
}
@@ -305,7 +411,125 @@
pin_mut!(create_result);
let polled = create_result.poll_unpin(&mut noop_cx);
let Poll::Ready(Err(_)) = polled else {
- panic!("Expected BASSClient to have failed");
+ panic!("Expected BroadcastAudioScanServiceClient to have failed");
};
}
+
+ #[test]
+ fn pump_notifications() {
+ let (mut client, mut fake_peer_service) = setup_client();
+ let mut event_stream = client.take_event_stream().expect("stream was created");
+
+ // Send notification for updating BRS characteristic to indicate it's synced and requires broadcast code.
+ #[rustfmt::skip]
+ fake_peer_service.add_characteristic(
+ Characteristic {
+ handle: RECEIVE_STATE_2_HANDLE,
+ uuid: BROADCAST_RECEIVE_STATE_UUID,
+ properties: CharacteristicProperties(vec![
+ CharacteristicProperty::Broadcast,
+ CharacteristicProperty::Notify,
+ ]),
+ permissions: AttributePermissions::default(),
+ descriptors: vec![],
+ },
+ vec![
+ 0x02, AddressType::Public as u8, // source id and address type
+ 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // address
+ 0x01, 0x02, 0x03, 0x04, // ad set id and broadcast id
+ PaSyncState::Synced as u8,
+ EncryptionStatus::BroadcastCodeRequired.raw_value(),
+ 0x00, // no subgroups
+ ],
+ );
+
+ // Check that synced and broadcast code required events were sent out.
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let recv_fut = event_stream.select_next_some();
+ let event = block_on(recv_fut).expect("should receive event");
+ assert_eq!(event, BroadcastAudioScanServiceEvent::SyncedToPa(BroadcastId::new(0x040302)));
+
+ let recv_fut = event_stream.select_next_some();
+ let event = block_on(recv_fut).expect("should receive event");
+ assert_eq!(
+ event,
+ BroadcastAudioScanServiceEvent::BroadcastCodeRequired(BroadcastId::new(0x040302))
+ );
+
+ // Send notification for updating BRS characteristic to indicate it requires sync info.
+ // Notification for updating the BRS characteristic value for characteristic with handle 3.
+ #[rustfmt::skip]
+ fake_peer_service.add_characteristic(
+ Characteristic {
+ handle: RECEIVE_STATE_3_HANDLE,
+ uuid: BROADCAST_RECEIVE_STATE_UUID,
+ properties: CharacteristicProperties(vec![
+ CharacteristicProperty::Broadcast,
+ CharacteristicProperty::Notify,
+ ]),
+ permissions: AttributePermissions::default(),
+ descriptors: vec![],
+ },
+ vec![
+ 0x03, AddressType::Public as u8, // source id and address type
+ 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // address
+ 0x01, 0x03, 0x04, 0x05, // ad set id and broadcast id
+ PaSyncState::SyncInfoRequest as u8,
+ EncryptionStatus::NotEncrypted.raw_value(),
+ 0x00, // no subgroups
+ ],
+ );
+
+ // Check that sync info required event was sent out.
+ let recv_fut = event_stream.select_next_some();
+ let event = block_on(recv_fut).expect("should receive event");
+ assert_eq!(
+ event,
+ BroadcastAudioScanServiceEvent::SyncInfoRequested(BroadcastId::new(0x050403))
+ );
+
+ // Stream should be pending since no more notifications.
+ assert!(event_stream.poll_next_unpin(&mut noop_cx).is_pending());
+ }
+
+ #[test]
+ fn set_broadcast_code() {
+ let (mut client, mut fake_peer_service) = setup_client();
+
+ // Manually update the internal id tracker for testing purposes.
+ // In practice, this would have been updated from BRS value change notification.
+ client.id_tracker.lock().update(0x01, BroadcastId::new(0x030201));
+
+ {
+ fake_peer_service.expect_characteristic_value(
+ &AUDIO_SCAN_CONTROL_POINT_HANDLE,
+ vec![0x04, 0x01, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
+ );
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let set_code_fut = client.set_broadcast_code(BroadcastId::new(0x030201), [1; 16]);
+ pin_mut!(set_code_fut);
+ let polled = set_code_fut.poll_unpin(&mut noop_cx);
+ let Poll::Ready(Ok(_)) = polled else {
+ panic!("Expected to succeed");
+ };
+ }
+ }
+
+ #[test]
+ fn set_broadcast_code_fails() {
+ let (mut client, _) = setup_client();
+
+ {
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let set_code_fut = client.set_broadcast_code(BroadcastId::new(0x030201), [1; 16]);
+ pin_mut!(set_code_fut);
+ let polled = set_code_fut.poll_unpin(&mut noop_cx);
+
+ // Should fail because we cannot get source id for the broadcast id since BRS Characteristic value wasn't updated.
+ let Poll::Ready(Err(_)) = polled else {
+ panic!("Expected to fail");
+ };
+ }
+ }
}
diff --git a/rust/bt-bass/src/client/error.rs b/rust/bt-bass/src/client/error.rs
index dab1378..113d15f 100644
--- a/rust/bt-bass/src/client/error.rs
+++ b/rust/bt-bass/src/client/error.rs
@@ -9,7 +9,9 @@
#[derive(Debug, Error)]
pub enum Error {
- #[error("An unsupported opcode ({0:#x}) used in a Broadcast Audio Scan Control Point operation")]
+ #[error(
+ "An unsupported opcode ({0:#x}) used in a Broadcast Audio Scan Control Point operation"
+ )]
OpCodeNotSupported(u8),
#[error("Invalid source id ({0:#x}) used in a Broadcast Audio Scan Control Point operation")]
@@ -28,8 +30,9 @@
Generic(String),
}
-/// This error represents an error we found at the remote service
-/// which prevents us from proceeding further with the client operation.
+/// This error represents an error we found at the remote BASS service or
+/// we encountered during the interaction with the remote service which
+/// prevents us from proceeding further with the client operation.
#[derive(Debug, Error, PartialEq)]
pub enum ServiceError {
#[error("Missing a required service characteristic")]
@@ -40,4 +43,7 @@
#[error("Failed to configure notification for Broadcast Recieve State characteristic (handle={0:?})")]
NotificationConfig(Handle),
+
+ #[error("Broadcast Receive State notification channels closed unexpectedly: {0}")]
+ NotificationChannelClosed(String),
}
diff --git a/rust/bt-bass/src/client/event.rs b/rust/bt-bass/src/client/event.rs
new file mode 100644
index 0000000..8fae56f
--- /dev/null
+++ b/rust/bt-bass/src/client/event.rs
@@ -0,0 +1,319 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::collections::{HashMap, VecDeque};
+use std::sync::Arc;
+use std::task::Poll;
+
+use bt_common::packet_encoding::Decodable;
+use bt_gatt::client::CharacteristicNotification;
+use bt_gatt::types::{Error as BtGattError, Handle};
+use futures::stream::{BoxStream, FusedStream, SelectAll};
+use futures::{Stream, StreamExt};
+use parking_lot::Mutex;
+
+use crate::client::error::Error;
+use crate::client::error::ServiceError;
+use crate::client::BroadcastSourceIdTracker;
+use crate::types::*;
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum BroadcastAudioScanServiceEvent {
+ // Broadcast Audio Scan Service (BASS) server requested for SyncInfo through PAST procedure.
+ SyncInfoRequested(BroadcastId),
+ // BASS server failed to synchornize to PA or did not synchronize to PA.
+ NotSyncedToPa(BroadcastId),
+ // BASS server successfully synced to PA.
+ SyncedToPa(BroadcastId),
+ // BASS server failed to sync to PA since SyncInfo wasn't received.
+ SyncedFailedNoPast(BroadcastId),
+ // BASS server requires code to since the BIS is encrypted.
+ BroadcastCodeRequired(BroadcastId),
+ // BASS server failed to decrypt BIS using the previously provided code.
+ InvalidBroadcastCode(BroadcastId, [u8; 16]),
+ // Received a packet from the BASS server not recognized by this library.
+ UnknownPacket,
+}
+
+impl BroadcastAudioScanServiceEvent {
+ pub(crate) fn from_broadcast_receive_state(
+ state: &ReceiveState,
+ ) -> Vec<BroadcastAudioScanServiceEvent> {
+ let mut events = Vec::new();
+ let pa_sync_state = state.pa_sync_state();
+ let broadcast_id = state.broadcast_id();
+ match pa_sync_state {
+ PaSyncState::SyncInfoRequest => {
+ events.push(BroadcastAudioScanServiceEvent::SyncInfoRequested(broadcast_id))
+ }
+ PaSyncState::Synced => {
+ events.push(BroadcastAudioScanServiceEvent::SyncedToPa(broadcast_id))
+ }
+ PaSyncState::FailedToSync | PaSyncState::NotSynced => {
+ events.push(BroadcastAudioScanServiceEvent::NotSyncedToPa(broadcast_id))
+ }
+ PaSyncState::NoPast => {
+ events.push(BroadcastAudioScanServiceEvent::SyncedFailedNoPast(broadcast_id))
+ }
+ }
+ match state.big_encryption() {
+ EncryptionStatus::BroadcastCodeRequired => {
+ events.push(BroadcastAudioScanServiceEvent::BroadcastCodeRequired(broadcast_id))
+ }
+ EncryptionStatus::BadCode(code) => events.push(
+ BroadcastAudioScanServiceEvent::InvalidBroadcastCode(broadcast_id, code.clone()),
+ ),
+ _ => {}
+ };
+ events
+ }
+}
+
+pub struct BroadcastAudioScanServiceEventStream {
+ // Polled to receive BASS notifications.
+ notification_streams:
+ SelectAll<BoxStream<'static, Result<CharacteristicNotification, BtGattError>>>,
+
+ event_queue: VecDeque<Result<BroadcastAudioScanServiceEvent, Error>>,
+ terminated: bool,
+
+ // States to be updated.
+ id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
+ receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+}
+
+impl BroadcastAudioScanServiceEventStream {
+ pub(crate) fn new(
+ notification_streams: SelectAll<
+ BoxStream<'static, Result<CharacteristicNotification, BtGattError>>,
+ >,
+ id_tracker: Arc<Mutex<BroadcastSourceIdTracker>>,
+ receive_states: Arc<Mutex<HashMap<Handle, Option<BroadcastReceiveState>>>>,
+ ) -> Self {
+ Self {
+ notification_streams,
+ event_queue: VecDeque::new(),
+ terminated: false,
+ id_tracker,
+ receive_states,
+ }
+ }
+}
+
+impl FusedStream for BroadcastAudioScanServiceEventStream {
+ fn is_terminated(&self) -> bool {
+ self.terminated
+ }
+}
+
+impl Stream for BroadcastAudioScanServiceEventStream {
+ type Item = Result<BroadcastAudioScanServiceEvent, Error>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ if self.terminated {
+ return Poll::Ready(None);
+ }
+
+ loop {
+ match self.notification_streams.poll_next_unpin(cx) {
+ Poll::Pending => {}
+ Poll::Ready(None) => {
+ self.event_queue.push_back(Err(Error::Service(
+ super::error::ServiceError::NotificationChannelClosed(format!(
+ "GATT notification stream for BRS characteristics closed"
+ )),
+ )));
+ }
+ Poll::Ready(Some(received)) => {
+ match received {
+ Err(error) => match error {
+ BtGattError::PeerNotRecognized(_)
+ | BtGattError::ScanFailed(_)
+ | BtGattError::Other(_) => {
+ self.event_queue.push_back(Err(Error::Service(
+ ServiceError::NotificationChannelClosed(format!(
+ "unexpected error encountered from GATT notification"
+ )),
+ )));
+ }
+ BtGattError::PeerDisconnected(id) => {
+ self.event_queue.push_back(Err(Error::Service(
+ ServiceError::NotificationChannelClosed(format!(
+ "peer ({id}) disconnected"
+ )),
+ )));
+ }
+ _ => {} // TODO(b/308483171): decide what to do for non-critical errors.
+ },
+ Ok(notification) => {
+ let Ok((brs, _)) =
+ BroadcastReceiveState::decode(notification.value.as_slice())
+ else {
+ self.event_queue
+ .push_back(Ok(BroadcastAudioScanServiceEvent::UnknownPacket));
+ break;
+ };
+ match &brs {
+ BroadcastReceiveState::Empty => {}
+ BroadcastReceiveState::NonEmpty(state) => {
+ let events = BroadcastAudioScanServiceEvent::from_broadcast_receive_state(state);
+ events
+ .into_iter()
+ .for_each(|e| self.event_queue.push_back(Ok(e)));
+
+ // Update broadcast ID to source ID mapping.
+ let _ = self
+ .id_tracker
+ .lock()
+ .update(state.source_id(), state.broadcast_id());
+ }
+ };
+ {
+ // Update the Broadcast Receive States.
+ let mut lock = self.receive_states.lock();
+ let char = lock.get_mut(¬ification.handle).unwrap();
+ char.replace(brs);
+ }
+ }
+ }
+ }
+ };
+ break;
+ }
+
+ let popped = self.event_queue.pop_front();
+ match popped {
+ None => Poll::Pending,
+ Some(item) => match item {
+ Ok(event) => Poll::Ready(Some(Ok(event))),
+ Err(e) => {
+ // If an error was received, we terminate the event stream, but send an error to indicate why it was terminated.
+ self.terminated = true;
+ Poll::Ready(Some(Err(e)))
+ }
+ },
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use futures::channel::mpsc::unbounded;
+
+ #[test]
+ fn poll_broadcast_audio_scan_service_event_stream() {
+ let mut streams = SelectAll::new();
+ let (sender1, receiver1) = unbounded();
+ let (sender2, receiver2) = unbounded();
+ streams.push(receiver1.boxed());
+ streams.push(receiver2.boxed());
+
+ let id_tracker = Arc::new(Mutex::new(BroadcastSourceIdTracker::new()));
+ let receive_states =
+ Arc::new(Mutex::new(HashMap::from([(Handle(0x1), None), (Handle(0x2), None)])));
+ let mut event_streams =
+ BroadcastAudioScanServiceEventStream::new(streams, id_tracker, receive_states);
+
+ // Send notifications to underlying streams.
+ let bad_code_status =
+ EncryptionStatus::BadCode([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]);
+ #[rustfmt::skip]
+ sender1
+ .unbounded_send(Ok(CharacteristicNotification {
+ handle: Handle(0x1),
+ value: vec![
+ 0x01, AddressType::Public as u8, // source id and address type
+ 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // address
+ 0x01, 0x01, 0x02, 0x03, // ad set id and broadcast id
+ PaSyncState::FailedToSync as u8,
+ bad_code_status.raw_value(),
+ 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16, // bad code
+ 0x00, // no subgroups
+ ],
+ maybe_truncated: false,
+ }))
+ .expect("should send");
+
+ #[rustfmt::skip]
+ sender2
+ .unbounded_send(Ok(CharacteristicNotification {
+ handle: Handle(0x2),
+ value: vec![
+ 0x02, AddressType::Public as u8, // source id and address type
+ 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // address
+ 0x01, 0x02, 0x03, 0x04, // ad set id and broadcast id
+ PaSyncState::NoPast as u8,
+ EncryptionStatus::NotEncrypted.raw_value(),
+ 0x00, // no subgroups
+ ],
+ maybe_truncated: false,
+ }))
+ .expect("should send");
+
+ // Events should have been generated from notifications.
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ match event_streams.poll_next_unpin(&mut noop_cx) {
+ Poll::Ready(Some(Ok(event))) => assert_eq!(
+ event,
+ BroadcastAudioScanServiceEvent::NotSyncedToPa(BroadcastId::new(0x030201))
+ ),
+ _ => panic!("should have received event"),
+ }
+ match event_streams.poll_next_unpin(&mut noop_cx) {
+ Poll::Ready(Some(Ok(event))) => assert_eq!(
+ event,
+ BroadcastAudioScanServiceEvent::InvalidBroadcastCode(
+ BroadcastId::new(0x030201),
+ [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
+ )
+ ),
+ _ => panic!("should have received event"),
+ }
+ match event_streams.poll_next_unpin(&mut noop_cx) {
+ Poll::Ready(Some(Ok(event))) => assert_eq!(
+ event,
+ BroadcastAudioScanServiceEvent::SyncedFailedNoPast(BroadcastId::new(0x040302))
+ ),
+ _ => panic!("should have received event"),
+ }
+
+ // Should be pending because no more events generated from notifications.
+ assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
+
+ // Send notifications to underlying streams.
+ #[rustfmt::skip]
+ sender2
+ .unbounded_send(Ok(CharacteristicNotification {
+ handle: Handle(0x2),
+ value: vec![
+ 0x02, AddressType::Public as u8, // source id and address type
+ 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // address
+ 0x01, 0x02, 0x03, 0x04, // ad set id and broadcast id
+ PaSyncState::Synced as u8,
+ EncryptionStatus::NotEncrypted.raw_value(),
+ 0x00, // no subgroups
+ ],
+ maybe_truncated: false,
+ }))
+ .expect("should send");
+
+ // Event should have been generated from notification.
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ match event_streams.poll_next_unpin(&mut noop_cx) {
+ Poll::Ready(Some(Ok(event))) => assert_eq!(
+ event,
+ BroadcastAudioScanServiceEvent::SyncedToPa(BroadcastId::new(0x040302))
+ ),
+ _ => panic!("should have received event"),
+ }
+
+ // Should be pending because no more events generated from notifications.
+ assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
+ }
+}
diff --git a/rust/bt-bass/src/types.rs b/rust/bt-bass/src/types.rs
index fc7827f..2844069 100644
--- a/rust/bt-bass/src/types.rs
+++ b/rust/bt-bass/src/types.rs
@@ -2,10 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-use bt_common::Uuid;
-use bt_common::packet_encoding::{Decodable, Encodable, Error as PacketError};
-use bt_common::generic_audio::metadata_ltv::*;
use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::generic_audio::metadata_ltv::*;
+use bt_common::packet_encoding::{Decodable, Encodable, Error as PacketError};
+use bt_common::Uuid;
const ADDRESS_BYTE_SIZE: usize = 6;
const NUM_SUBGROUPS_BYTE_SIZE: usize = 1;
@@ -20,12 +20,21 @@
/// Broadcast_ID is a 3-byte data on the wire.
/// See Broadcast Audio Scan Service (BASS) spec v1.0 Table 3.5 for details.
-#[derive(Debug, PartialEq)]
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub struct BroadcastId(u32);
impl BroadcastId {
// On the wire, Broadcast_ID is transported in 3 bytes.
const BYTE_SIZE: usize = 3;
+
+ #[cfg(test)]
+ pub fn new(raw_value: u32) -> Self {
+ Self(raw_value)
+ }
+
+ pub fn raw_value(&self) -> u32 {
+ self.0
+ }
}
impl TryFrom<u32> for BroadcastId {
@@ -34,7 +43,9 @@
fn try_from(value: u32) -> Result<Self, Self::Error> {
const MAX_VALUE: u32 = 0xFFFFFF;
if value > MAX_VALUE {
- return Err(PacketError::InvalidParameter(format!("Broadcast ID cannot exceed 3 bytes")));
+ return Err(PacketError::InvalidParameter(format!(
+ "Broadcast ID cannot exceed 3 bytes"
+ )));
}
Ok(BroadcastId(value))
}
@@ -118,7 +129,9 @@
let expected = Self::opcode();
let got = ControlPointOpcode::try_from(raw_value)?;
if got != expected {
- return Err(PacketError::InvalidParameter(format!("got opcode {got:?}, expected {expected:?}")));
+ return Err(PacketError::InvalidParameter(format!(
+ "got opcode {got:?}, expected {expected:?}"
+ )));
}
Ok(got)
}
@@ -214,28 +227,34 @@
}
impl AddSourceOperation {
- const MIN_PACKET_SIZE: usize = ControlPointOpcode::BYTE_SIZE + AddressType::BYTE_SIZE + ADDRESS_BYTE_SIZE + AdvertisingSetId::BYTE_SIZE +
- BroadcastId::BYTE_SIZE + PA_SYNC_BYTE_SIZE + PaInterval::BYTE_SIZE + NUM_SUBGROUPS_BYTE_SIZE;
+ const MIN_PACKET_SIZE: usize = ControlPointOpcode::BYTE_SIZE
+ + AddressType::BYTE_SIZE
+ + ADDRESS_BYTE_SIZE
+ + AdvertisingSetId::BYTE_SIZE
+ + BroadcastId::BYTE_SIZE
+ + PA_SYNC_BYTE_SIZE
+ + PaInterval::BYTE_SIZE
+ + NUM_SUBGROUPS_BYTE_SIZE;
pub fn new(
- address_type: AddressType,
- advertiser_address: [u8; ADDRESS_BYTE_SIZE],
- sid: u8,
- broadcast_id: u32,
- pa_sync: PaSync,
- pa_interval: u16,
- subgroups: Vec<BigSubgroup>,
- ) -> Self {
- AddSourceOperation {
- advertiser_address_type: address_type,
- advertiser_address,
- advertising_sid: AdvertisingSetId(sid),
- broadcast_id: BroadcastId(broadcast_id),
- pa_sync,
- pa_interval: PaInterval(pa_interval),
- subgroups,
- }
+ address_type: AddressType,
+ advertiser_address: [u8; ADDRESS_BYTE_SIZE],
+ sid: u8,
+ broadcast_id: u32,
+ pa_sync: PaSync,
+ pa_interval: u16,
+ subgroups: Vec<BigSubgroup>,
+ ) -> Self {
+ AddSourceOperation {
+ advertiser_address_type: address_type,
+ advertiser_address,
+ advertising_sid: AdvertisingSetId(sid),
+ broadcast_id: BroadcastId(broadcast_id),
+ pa_sync,
+ pa_interval: PaInterval(pa_interval),
+ subgroups,
}
+ }
}
impl ControlPointOperation for AddSourceOperation {
@@ -272,15 +291,18 @@
subgroups.push(decoded.0);
idx += decoded.1;
}
- Ok((Self{
- advertiser_address_type,
- advertiser_address,
- advertising_sid,
- broadcast_id,
- pa_sync,
- pa_interval,
- subgroups,
- }, idx))
+ Ok((
+ Self {
+ advertiser_address_type,
+ advertiser_address,
+ advertising_sid,
+ broadcast_id,
+ pa_sync,
+ pa_interval,
+ subgroups,
+ },
+ idx,
+ ))
}
}
@@ -313,10 +335,7 @@
}
fn encoded_len(&self) -> core::primitive::usize {
- Self::MIN_PACKET_SIZE + self
- .subgroups
- .iter()
- .fold(0, |acc, g| acc + g.encoded_len())
+ Self::MIN_PACKET_SIZE + self.subgroups.iter().fold(0, |acc, g| acc + g.encoded_len())
}
}
@@ -330,7 +349,11 @@
}
impl ModifySourceOperation {
- const MIN_PACKET_SIZE: usize = ControlPointOpcode::BYTE_SIZE + SOURCE_ID_BYTE_SIZE + PA_SYNC_BYTE_SIZE + PaInterval::BYTE_SIZE + NUM_SUBGROUPS_BYTE_SIZE;
+ const MIN_PACKET_SIZE: usize = ControlPointOpcode::BYTE_SIZE
+ + SOURCE_ID_BYTE_SIZE
+ + PA_SYNC_BYTE_SIZE
+ + PaInterval::BYTE_SIZE
+ + NUM_SUBGROUPS_BYTE_SIZE;
pub fn new(
source_id: SourceId,
@@ -377,12 +400,7 @@
subgroups.push(decoded.0);
idx += decoded.1;
}
- Ok((Self{
- source_id,
- pa_sync,
- pa_interval,
- subgroups,
- }, idx))
+ Ok((Self { source_id, pa_sync, pa_interval, subgroups }, idx))
}
}
@@ -412,11 +430,7 @@
}
fn encoded_len(&self) -> core::primitive::usize {
- Self::MIN_PACKET_SIZE +
- self
- .subgroups
- .iter()
- .fold(0, |acc, g| acc + g.encoded_len())
+ Self::MIN_PACKET_SIZE + self.subgroups.iter().fold(0, |acc, g| acc + g.encoded_len())
}
}
@@ -429,10 +443,11 @@
impl SetBroadcastCodeOperation {
const BROADCAST_CODE_LEN: usize = 16;
- const PACKET_SIZE: usize = ControlPointOpcode::BYTE_SIZE + SOURCE_ID_BYTE_SIZE + Self::BROADCAST_CODE_LEN;
+ const PACKET_SIZE: usize =
+ ControlPointOpcode::BYTE_SIZE + SOURCE_ID_BYTE_SIZE + Self::BROADCAST_CODE_LEN;
pub fn new(source_id: SourceId, broadcast_code: [u8; 16]) -> Self {
- SetBroadcastCodeOperation{ source_id, broadcast_code }
+ SetBroadcastCodeOperation { source_id, broadcast_code }
}
}
@@ -453,11 +468,8 @@
let _ = Self::check_opcode(buf[0])?;
let source_id = buf[1];
let mut broadcast_code = [0; Self::BROADCAST_CODE_LEN];
- broadcast_code.copy_from_slice(&buf[2..2+Self::BROADCAST_CODE_LEN]);
- Ok((Self{
- source_id,
- broadcast_code,
- }, Self::PACKET_SIZE))
+ broadcast_code.copy_from_slice(&buf[2..2 + Self::BROADCAST_CODE_LEN]);
+ Ok((Self { source_id, broadcast_code }, Self::PACKET_SIZE))
}
}
@@ -471,7 +483,7 @@
buf[0] = Self::opcode() as u8;
buf[1] = self.source_id;
- buf[2..2+Self::BROADCAST_CODE_LEN].copy_from_slice(&self.broadcast_code);
+ buf[2..2 + Self::BROADCAST_CODE_LEN].copy_from_slice(&self.broadcast_code);
Ok(())
}
@@ -577,10 +589,7 @@
}
pub fn with_metadata(self, metadata: Metadata) -> Self {
- Self {
- bis_sync_bitfield: self.bis_sync_bitfield,
- metadata,
- }
+ Self { bis_sync_bitfield: self.bis_sync_bitfield, metadata }
}
}
@@ -601,7 +610,8 @@
}
while start_idx < 5 + metadata_len {
- let (data, len) = Metadatum::decode(&buf[start_idx..]).map_err(|e| PacketError::InvalidParameter(format!("{e}")))?;
+ let (data, len) = Metadatum::decode(&buf[start_idx..])
+ .map_err(|e| PacketError::InvalidParameter(format!("{e}")))?;
metadata.push(data);
start_idx += len;
}
@@ -630,7 +640,8 @@
buf[4] = metadata_len;
let mut next_idx = 5;
for m in &self.metadata {
- m.encode(&mut buf[next_idx..]).map_err(|e| PacketError::InvalidParameter(format!("{e}")))?;
+ m.encode(&mut buf[next_idx..])
+ .map_err(|e| PacketError::InvalidParameter(format!("{e}")))?;
next_idx += m.encoded_len();
}
Ok(())
@@ -724,8 +735,53 @@
}
impl ReceiveState {
- const MIN_PACKET_SIZE: usize = SOURCE_ID_BYTE_SIZE + AddressType::BYTE_SIZE + ADDRESS_BYTE_SIZE + AdvertisingSetId::BYTE_SIZE +
- BroadcastId::BYTE_SIZE + PA_SYNC_BYTE_SIZE + EncryptionStatus::MIN_PACKET_SIZE + NUM_SUBGROUPS_BYTE_SIZE;
+ const MIN_PACKET_SIZE: usize = SOURCE_ID_BYTE_SIZE
+ + AddressType::BYTE_SIZE
+ + ADDRESS_BYTE_SIZE
+ + AdvertisingSetId::BYTE_SIZE
+ + BroadcastId::BYTE_SIZE
+ + PA_SYNC_BYTE_SIZE
+ + EncryptionStatus::MIN_PACKET_SIZE
+ + NUM_SUBGROUPS_BYTE_SIZE;
+
+ #[cfg(test)]
+ pub fn new(
+ source_id: u8,
+ source_address_type: AddressType,
+ source_address: [u8; ADDRESS_BYTE_SIZE],
+ source_adv_sid: u8,
+ broadcast_id: u32,
+ pa_sync_state: PaSyncState,
+ big_encryption: EncryptionStatus,
+ subgroups: Vec<BigSubgroup>,
+ ) -> ReceiveState {
+ Self {
+ source_id,
+ source_address_type,
+ source_address,
+ source_adv_sid: AdvertisingSetId(source_adv_sid),
+ broadcast_id: BroadcastId(broadcast_id),
+ pa_sync_state,
+ big_encryption,
+ subgroups,
+ }
+ }
+
+ pub fn pa_sync_state(&self) -> PaSyncState {
+ self.pa_sync_state
+ }
+
+ pub fn big_encryption(&self) -> EncryptionStatus {
+ self.big_encryption
+ }
+
+ pub fn broadcast_id(&self) -> BroadcastId {
+ self.broadcast_id
+ }
+
+ pub fn source_id(&self) -> SourceId {
+ self.source_id
+ }
}
impl Decodable for ReceiveState {
@@ -761,16 +817,19 @@
subgroups.push(subgroup);
idx += consumed;
}
- Ok((ReceiveState{
- source_id,
- source_address_type,
- source_address,
- source_adv_sid,
- broadcast_id,
- pa_sync_state,
- big_encryption,
- subgroups,
- }, idx))
+ Ok((
+ ReceiveState {
+ source_id,
+ source_address_type,
+ source_address,
+ source_adv_sid,
+ broadcast_id,
+ pa_sync_state,
+ big_encryption,
+ subgroups,
+ },
+ idx,
+ ))
}
}
@@ -806,17 +865,15 @@
fn encoded_len(&self) -> core::primitive::usize {
// Length including Source_ID, Source_Address_Type, Source_Address, Source_Adv_SID, Broadcast_ID, PA_Sync_State, BIG_Encryption, Bad_Code,
// Num_Subgroups and subgroup-related params.
- SOURCE_ID_BYTE_SIZE + AddressType::BYTE_SIZE
+ SOURCE_ID_BYTE_SIZE
+ + AddressType::BYTE_SIZE
+ self.source_address.len()
+ AdvertisingSetId::BYTE_SIZE
+ self.broadcast_id.encoded_len()
+ PA_SYNC_BYTE_SIZE
+ self.big_encryption.encoded_len()
+ NUM_SUBGROUPS_BYTE_SIZE
- + self
- .subgroups
- .iter()
- .map(Encodable::encoded_len).sum::<usize>()
+ + self.subgroups.iter().map(Encodable::encoded_len).sum::<usize>()
}
}
@@ -860,7 +917,7 @@
// Returns the u8 value that represents the status of encryption
// as described for BIG_Encryption parameter.
- fn raw_value(self) -> u8 {
+ pub const fn raw_value(self) -> u8 {
match self {
EncryptionStatus::NotEncrypted => 0x00,
EncryptionStatus::BroadcastCodeRequired => 0x01,
@@ -936,7 +993,8 @@
let (got, bytes) = BroadcastId::decode(&bytes).expect("should succeed");
assert_eq!(got, id);
assert_eq!(bytes, BroadcastId::BYTE_SIZE);
- let got = BroadcastId::try_from(u32::from_le_bytes([0x0C, 0x0B, 0x0A, 0x00])).expect("should succeed");
+ let got = BroadcastId::try_from(u32::from_le_bytes([0x0C, 0x0B, 0x0A, 0x00]))
+ .expect("should succeed");
assert_eq!(got, id);
}
@@ -970,7 +1028,7 @@
// Decoding not encrypted.
let decoded = EncryptionStatus::decode(&bytes).expect("should succeed");
assert_eq!(decoded.0, not_encrypted);
- assert_eq!(decoded.1 ,1);
+ assert_eq!(decoded.1, 1);
// Encoding bad code status with code.
let bad_code = EncryptionStatus::BadCode([
@@ -983,14 +1041,14 @@
let bytes = vec![
0x03, 0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04,
- 0x03, 0x02, 0x01
+ 0x03, 0x02, 0x01,
];
assert_eq!(buf, bytes);
// Deocoding bad code statsu with code.
let decoded = EncryptionStatus::decode(&bytes).expect("should succeed");
assert_eq!(decoded.0, bad_code);
- assert_eq!(decoded.1 ,17);
+ assert_eq!(decoded.1, 17);
}
#[test]
@@ -1031,7 +1089,7 @@
// Decoding remote scan stopped.
let decoded = RemoteScanStoppedOperation::decode(&bytes).expect("should succeed");
assert_eq!(decoded.0, stopped);
- assert_eq!(decoded.1 ,1);
+ assert_eq!(decoded.1, 1);
}
#[test]
@@ -1048,7 +1106,7 @@
// Decoding remote scan started.
let decoded = RemoteScanStartedOperation::decode(&bytes).expect("should succeed");
assert_eq!(decoded.0, started);
- assert_eq!(decoded.1 ,1);
+ assert_eq!(decoded.1, 1);
}
#[test]
@@ -1068,15 +1126,15 @@
op.encode(&mut buf[..]).expect("shoud succeed");
let bytes = vec![
- 0x02, 0x00, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00,
- 0x01, 0x11, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x00,
+ 0x02, 0x00, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00, 0x01, 0x11, 0x00, 0x00, 0x00, 0xFF,
+ 0xFF, 0x00,
];
assert_eq!(buf, bytes);
// Decoding operation with no subgroups.
let decoded = AddSourceOperation::decode(&bytes).expect("should succeed");
assert_eq!(decoded.0, op);
- assert_eq!(decoded.1 ,16);
+ assert_eq!(decoded.1, 16);
}
#[test]
@@ -1100,8 +1158,8 @@
op.encode(&mut buf[..]).expect("shoud succeed");
let bytes = vec![
- 0x02, 0x01, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00, 0x01, 0x11, 0x00, 0x00, 0x01, 0xFF, 0xFF, 0x01,
- 0xFF, 0xFF, 0xFF, 0xFF, 0x0A, // BIS_Sync, Metdata_Length
+ 0x02, 0x01, 0x04, 0x10, 0x00, 0x00, 0x00, 0x00, 0x01, 0x11, 0x00, 0x00, 0x01, 0xFF,
+ 0xFF, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x0A, // BIS_Sync, Metdata_Length
0x03, 0x01, 0x0C, 0x00, // Preferred_Audio_Contexts metdataum
0x05, 0x03, 0x74, 0x65, 0x73, 0x074, // Program_Info metadatum
];
@@ -1116,12 +1174,7 @@
#[test]
fn modify_source_without_subgroups() {
// Encoding operation with no subgroups.
- let op = ModifySourceOperation::new(
- 0x0A,
- PaSync::SyncPastAvailable,
- 0x1004,
- vec![],
- );
+ let op = ModifySourceOperation::new(0x0A, PaSync::SyncPastAvailable, 0x1004, vec![]);
assert_eq!(op.encoded_len(), 6);
let mut buf = vec![0u8; op.encoded_len()];
op.encode(&mut buf[..]).expect("shoud succeed");
@@ -1154,8 +1207,8 @@
op.encode(&mut buf[..]).expect("shoud succeed");
let bytes = vec![
- 0x03, 0x0B, 0x00, 0xFF, 0xFF, 0x02,
- 0xFF, 0xFF, 0xFF, 0xFF, 0x03, 0x02, 0x06, 0x01, // First subgroup.
+ 0x03, 0x0B, 0x00, 0xFF, 0xFF, 0x02, 0xFF, 0xFF, 0xFF, 0xFF, 0x03, 0x02, 0x06,
+ 0x01, // First subgroup.
0xFE, 0x00, 0x00, 0x00, 0x02, 0x01, 0x09, // Second subgroup.
];
assert_eq!(buf, bytes);
@@ -1182,7 +1235,7 @@
let bytes = vec![
0x04, 0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, 0x11, 0x12,
- 0x13, 0x14, 0x15, 0x16
+ 0x13, 0x14, 0x15, 0x16,
];
assert_eq!(buf, bytes);
@@ -1219,7 +1272,10 @@
source_adv_sid: AdvertisingSetId(0x01),
broadcast_id: BroadcastId(0x00010203),
pa_sync_state: PaSyncState::Synced,
- big_encryption: EncryptionStatus::BadCode([0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10, 0x09, 0x08, 0x7, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01]),
+ big_encryption: EncryptionStatus::BadCode([
+ 0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10, 0x09, 0x08, 0x7, 0x06, 0x05, 0x04, 0x03,
+ 0x02, 0x01,
+ ]),
subgroups: vec![],
});
assert_eq!(state.encoded_len(), 31);
@@ -1227,8 +1283,9 @@
state.encode(&mut buf[..]).expect("should succeed");
let bytes = vec![
- 0x01, 0x00, 0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0x01, 0x03, 0x02, 0x01, 0x02,
- 0x03, 0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, // Bad_Code with the code.
+ 0x01, 0x00, 0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0x01, 0x03, 0x02, 0x01, 0x02, 0x03,
+ 0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03,
+ 0x02, 0x01, // Bad_Code with the code.
0x00,
];
assert_eq!(buf, bytes);
@@ -1251,7 +1308,8 @@
pa_sync_state: PaSyncState::NotSynced,
big_encryption: EncryptionStatus::NotEncrypted,
subgroups: vec![
- BigSubgroup::new(None).with_metadata(vec![Metadatum::ParentalRating(Rating::AllAge)]), // encoded_len = 8
+ BigSubgroup::new(None)
+ .with_metadata(vec![Metadatum::ParentalRating(Rating::AllAge)]), // encoded_len = 8
],
});
assert_eq!(state.encoded_len(), 23);
@@ -1259,8 +1317,8 @@
state.encode(&mut buf[..]).expect("should succeed");
let bytes = vec![
- 0x01, 0x01, 0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0x01, 0x03, 0x02, 0x01, 0x00,
- 0x00, 0x01, // 1 Subgroup.
+ 0x01, 0x01, 0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0x01, 0x03, 0x02, 0x01, 0x00, 0x00,
+ 0x01, // 1 Subgroup.
0xFF, 0xFF, 0xFF, 0xFF, 0x03, 0x02, 0x06, 0x01, // Subgroup.
];
assert_eq!(buf, bytes);
@@ -1269,6 +1327,5 @@
let decoded = BroadcastReceiveState::decode(&bytes).expect("should succeed");
assert_eq!(decoded.0, state);
assert_eq!(decoded.1, 23);
-
}
}
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index c4189d7..3a78bde 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -55,6 +55,15 @@
.expect("should succeed");
}
}
+
+ // Sets expected characteristic value so that it can be used for validation when write method is called.
+ pub fn expect_characteristic_value(&mut self, handle: &Handle, value: Vec<u8>) {
+ let mut lock = self.inner.lock();
+ let Some(char) = lock.characteristics.get_mut(handle) else {
+ panic!("Can't find characteristic {handle:?} to set expected value");
+ };
+ char.1 = value;
+ }
}
impl crate::client::PeerService<FakeTypes> for FakePeerService {
@@ -88,14 +97,24 @@
ready(Ok((value.len(), false)))
}
+ /// For testing, should call `expect_characteristic_value` with the expected value.
fn write_characteristic<'a>(
&self,
- _handle: &Handle,
+ handle: &Handle,
_mode: WriteMode,
_offset: u16,
- _buf: &'a [u8],
+ buf: &'a [u8],
) -> <FakeTypes as GattTypes>::WriteFut<'a> {
- todo!()
+ let expected_characteristics = &(*self.inner.lock()).characteristics;
+ // The write operation was not expected.
+ let Some((_, expected)) = expected_characteristics.get(handle) else {
+ panic!("Write operation to characteristic {handle:?} was not expected");
+ };
+ // Value written was not expected.
+ if buf.len() != expected.len() || &buf[..expected.len()] != expected.as_slice() {
+ panic!("Value written to characteristic {handle:?} was not expected");
+ }
+ ready(Ok(()))
}
fn read_descriptor<'a>(
diff --git a/rust/bt-gatt/src/tests.rs b/rust/bt-gatt/src/tests.rs
index 894b143..2593b2f 100644
--- a/rust/bt-gatt/src/tests.rs
+++ b/rust/bt-gatt/src/tests.rs
@@ -149,6 +149,48 @@
}
#[test]
+fn peer_service_write_characteristic() {
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+ let mut fake_peer_service: FakePeerService = set_up();
+
+ // Set expected characteristic value before calling `write_characteristic`.
+ fake_peer_service.expect_characteristic_value(&Handle(1), vec![0, 1, 2, 3]);
+ let mut write_result = fake_peer_service.write_characteristic(
+ &Handle(0x1),
+ WriteMode::WithoutResponse,
+ 0,
+ vec![0, 1, 2, 3].as_slice(),
+ );
+
+ match write_result.poll_unpin(&mut noop_cx) {
+ Poll::Ready(Ok(())) => {}
+ _ => panic!("expected write to succeed"),
+ }
+}
+
+#[test]
+#[should_panic]
+fn peer_service_write_characteristic_fail() {
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+ let fake_peer_service = set_up();
+
+ // Write some random value without calling `expect_characteristic_value` first.
+ let mut write_result = fake_peer_service.write_characteristic(
+ &Handle(0x1),
+ WriteMode::WithoutResponse,
+ 0,
+ vec![13, 14, 15, 16].as_slice(),
+ );
+
+ match write_result.poll_unpin(&mut noop_cx) {
+ Poll::Ready(Err(_)) => {}
+ _ => panic!("expected write to fail"),
+ }
+}
+
+#[test]
fn peer_service_subsribe() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());