rust/bt-pacs: Implement notification for available audio contexts
Available audio contexts characteristic should support notify property:
- Add a method on PACS server to allow the clients of the library to
update the available audio contexts.
- Notify the change through the GATT local service.
Bug: b/309015071
Test: cargo test
Change-Id: Ib0b41f152dcb8148410ac7275cd36d153aec08a0
Reviewed-on: https://bluetooth-review.googlesource.com/c/bluetooth/+/2020
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index 3397799..148e666 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -7,7 +7,7 @@
use futures::future::{ready, Ready};
use futures::stream::{FusedStream, Stream};
use parking_lot::Mutex;
-use std::collections::{HashMap, VecDeque};
+use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::task::{Poll, Waker};
@@ -18,7 +18,9 @@
use crate::periodic_advertising::{PeriodicAdvertising, SyncReport};
use crate::pii::GetPeerAddr;
-use crate::server::{self, LocalService, ReadResponder, ServiceDefinition, WriteResponder};
+use crate::server::{
+ self, LocalService, NotificationType, ReadResponder, ServiceDefinition, WriteResponder,
+};
use crate::{types::*, GattTypes, ServerTypes};
#[derive(Default)]
@@ -427,6 +429,8 @@
service_senders:
HashMap<server::ServiceId, UnboundedSender<Result<server::ServiceEvent<FakeTypes>>>>,
sender: UnboundedSender<FakeServerEvent>,
+ notification_peers: HashSet<PeerId>,
+ indication_peers: HashSet<PeerId>,
}
#[derive(Clone, Debug)]
@@ -454,6 +458,8 @@
services: Default::default(),
service_senders: Default::default(),
sender,
+ notification_peers: HashSet::new(),
+ indication_peers: HashSet::new(),
})),
},
receiver,
@@ -511,6 +517,38 @@
}))
.unwrap();
}
+
+ pub fn incoming_client_configuration(
+ &self,
+ peer_id: PeerId,
+ id: server::ServiceId,
+ handle: Handle,
+ notification_type: NotificationType,
+ ) {
+ let mut inner = self.inner.lock();
+ match notification_type {
+ NotificationType::Notify => {
+ inner.notification_peers.insert(peer_id);
+ }
+ NotificationType::Indicate => {
+ inner.indication_peers.insert(peer_id);
+ }
+ NotificationType::Disable => {
+ inner.notification_peers.remove(&peer_id);
+ inner.indication_peers.remove(&peer_id);
+ }
+ }
+ inner
+ .service_senders
+ .get(&id)
+ .unwrap()
+ .unbounded_send(Ok(server::ServiceEvent::ClientConfiguration {
+ peer_id,
+ handle,
+ notification_type,
+ }))
+ .unwrap();
+ }
}
pub struct FakeLocalService {
@@ -544,16 +582,24 @@
}
fn notify(&self, characteristic: &Handle, data: &[u8], peers: &[PeerId]) {
- self.inner
- .lock()
- .sender
- .unbounded_send(FakeServerEvent::Notified {
- service_id: self.id,
- handle: *characteristic,
- value: data.into(),
- peers: peers.into(),
- })
- .unwrap();
+ let inner = self.inner.lock();
+ let peers_to_notify: HashSet<_> = if peers.is_empty() {
+ inner.notification_peers.clone()
+ } else {
+ peers.iter().filter(|p| inner.notification_peers.contains(p)).cloned().collect()
+ };
+
+ if !peers_to_notify.is_empty() {
+ inner
+ .sender
+ .unbounded_send(FakeServerEvent::Notified {
+ service_id: self.id,
+ handle: *characteristic,
+ value: data.into(),
+ peers: peers_to_notify.into_iter().collect(),
+ })
+ .unwrap();
+ }
}
fn indicate(
@@ -563,17 +609,25 @@
peers: &[PeerId],
) -> <FakeTypes as ServerTypes>::IndicateConfirmationStream {
let (sender, receiver) = futures::channel::mpsc::unbounded();
- self.inner
- .lock()
- .sender
- .unbounded_send(FakeServerEvent::Indicated {
- service_id: self.id,
- handle: *characteristic,
- value: data.into(),
- peers: peers.into(),
- confirmations: sender,
- })
- .unwrap();
+ let inner = self.inner.lock();
+ let peers_to_indicate: HashSet<_> = if peers.is_empty() {
+ inner.indication_peers.clone()
+ } else {
+ peers.iter().filter(|p| inner.indication_peers.contains(p)).cloned().collect()
+ };
+
+ if !peers_to_indicate.is_empty() {
+ inner
+ .sender
+ .unbounded_send(FakeServerEvent::Indicated {
+ service_id: self.id,
+ handle: *characteristic,
+ value: data.into(),
+ peers: peers_to_indicate.into_iter().collect(),
+ confirmations: sender,
+ })
+ .unwrap();
+ }
receiver
}
}
diff --git a/rust/bt-gatt/src/types.rs b/rust/bt-gatt/src/types.rs
index c9b72d3..c65d05b 100644
--- a/rust/bt-gatt/src/types.rs
+++ b/rust/bt-gatt/src/types.rs
@@ -15,6 +15,7 @@
WriteNotPermitted = 3,
InvalidPdu = 4,
InsufficientAuthentication = 5,
+ RequestNotSupported = 6,
InvalidOffset = 7,
InsufficientAuthorization = 8,
InsufficientEncryptionKeySize = 12,
@@ -81,6 +82,7 @@
3 => Ok(Self::WriteNotPermitted),
4 => Ok(Self::InvalidPdu),
5 => Ok(Self::InsufficientAuthentication),
+ 6 => Ok(Self::RequestNotSupported),
7 => Ok(Self::InvalidOffset),
8 => Ok(Self::InsufficientAuthorization),
12 => Ok(Self::InsufficientEncryptionKeySize),
diff --git a/rust/bt-pacs/Cargo.toml b/rust/bt-pacs/Cargo.toml
index dd975d8..c260971 100644
--- a/rust/bt-pacs/Cargo.toml
+++ b/rust/bt-pacs/Cargo.toml
@@ -14,6 +14,7 @@
futures.workspace = true
pin-project.workspace = true
thiserror.workspace = true
+parking_lot.workspace = true
[dev-dependencies]
bt-gatt = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-pacs/src/lib.rs b/rust/bt-pacs/src/lib.rs
index fa05dc2..7d65629 100644
--- a/rust/bt-pacs/src/lib.rs
+++ b/rust/bt-pacs/src/lib.rs
@@ -303,6 +303,17 @@
Available(HashSet<ContextType>),
}
+impl FromIterator<ContextType> for AvailableContexts {
+ fn from_iter<I: IntoIterator<Item = ContextType>>(iter: I) -> Self {
+ let types_: HashSet<_> = iter.into_iter().collect();
+ if types_.len() == 0 {
+ AvailableContexts::NotAvailable
+ } else {
+ AvailableContexts::Available(types_)
+ }
+ }
+}
+
impl Decodable for AvailableContexts {
type Error = bt_common::packet_encoding::Error;
diff --git a/rust/bt-pacs/src/server.rs b/rust/bt-pacs/src/server.rs
index c3bfa1a..8987e17 100644
--- a/rust/bt-pacs/src/server.rs
+++ b/rust/bt-pacs/src/server.rs
@@ -34,6 +34,7 @@
use bt_gatt::server::{ReadResponder, ServiceDefinition, WriteResponder};
use bt_gatt::types::{GattError, Handle};
use bt_gatt::Server as _;
+
use futures::task::{Poll, Waker};
use futures::{Future, Stream};
use pin_project::pin_project;
@@ -41,8 +42,8 @@
use thiserror::Error;
use crate::{
- AudioLocations, AvailableAudioContexts, PacRecord, SinkAudioLocations, SourceAudioLocations,
- SupportedAudioContexts,
+ AudioLocations, AvailableAudioContexts, AvailableContexts, PacRecord, SinkAudioLocations,
+ SourceAudioLocations, SupportedAudioContexts,
};
pub(crate) mod types;
@@ -336,6 +337,38 @@
fn is_sink_locations_handle(&self, handle: Handle) -> bool {
self.sink_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
}
+
+ /// Updates available audio contexts by overwritting the previously
+ /// set available audio contexts value. If a context type is not
+ /// part of the supported audio contexts, it will be ignored and
+ /// the resulting available audio contexts would not contain that
+ /// particular type.
+ /// Returns the reference to the resulting available audio context.
+ pub fn update_available(&mut self, available: AudioContexts) -> &AvailableAudioContexts {
+ let supported = &self.supported_audio_contexts;
+ let new_sink =
+ AvailableContexts::from_iter(available.sink.intersection(&supported.sink).cloned());
+ let new_source =
+ AvailableContexts::from_iter(available.source.intersection(&supported.source).cloned());
+
+ let changed = self.available_audio_contexts.sink != new_sink
+ || self.available_audio_contexts.source != new_source;
+
+ self.available_audio_contexts.sink = new_sink;
+ self.available_audio_contexts.source = new_source;
+
+ if !changed {
+ return &self.available_audio_contexts;
+ }
+ if let LocalServiceState::Published { service, .. } = &self.local_service {
+ service.notify(
+ &AVAILABLE_AUDIO_CONTEXTS_HANDLE,
+ &self.available_audio_contexts.into_char_value(),
+ &[],
+ );
+ }
+ &self.available_audio_contexts
+ }
}
impl<T: bt_gatt::ServerTypes> Stream for Server<T> {
@@ -381,15 +414,13 @@
responder.respond(&value[offset..]);
continue;
}
- // TODO(b/309015071): support optional writes.
+ // TODO(b/335293412): support optional writes.
Write { responder, .. } => {
responder.error(GattError::WriteNotPermitted);
continue;
}
- // TODO(b/309015071): implement notify since it's mandatory.
- ClientConfiguration { .. } => {
- unimplemented!();
- }
+ // We don't do anything complex with ClientConfiguration for now.
+ ClientConfiguration { .. } => continue,
_ => continue,
}
}
@@ -405,10 +436,11 @@
use bt_common::generic_audio::AudioLocation;
use bt_common::PeerId;
use bt_gatt::server;
+ use bt_gatt::server::NotificationType;
use bt_gatt::test_utils::{FakeServer, FakeServerEvent, FakeTypes};
use bt_gatt::types::ServiceKind;
- use futures::{FutureExt, StreamExt};
+ use futures::{FutureExt, StreamExt};
use std::collections::HashSet;
use crate::AvailableContexts;
@@ -417,7 +449,7 @@
// - 1 sink and 1 source PAC characteristics
// - sink audio locations
fn default_server_builder() -> ServerBuilder {
- let builder = ServerBuilder::new()
+ ServerBuilder::new()
.add_sink(vec![PacRecord {
codec_id: CodecId::Assigned(CodingFormat::ALawLog),
codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
@@ -444,8 +476,7 @@
metadata: vec![],
},
])
- .add_source(vec![]);
- builder
+ .add_source(vec![])
}
#[test]
@@ -634,4 +665,106 @@
assert_eq!(handle, available_char_handle);
assert_eq!(value.expect("should be ok"), vec![0x04, 0x00, 0x00, 0x00]);
}
+
+ #[test]
+ fn update_available() {
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+ let mut server = default_server_builder()
+ .build::<FakeTypes>(
+ AudioContexts::new(
+ HashSet::from([ContextType::Media, ContextType::Alerts]),
+ HashSet::from([ContextType::Media]),
+ ),
+ AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
+ )
+ .unwrap();
+
+ let (fake_gatt_server, mut event_receiver) = FakeServer::new();
+ let _ = server.publish(fake_gatt_server.clone()).expect("should succeed");
+
+ // Server should poll on local server state.
+ let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
+ panic!("Should be pending");
+ };
+
+ // Should receive event that GATT service was published.
+ let mut event_stream = event_receiver.next();
+ let Poll::Ready(Some(FakeServerEvent::Published { id, .. })) =
+ event_stream.poll_unpin(&mut noop_cx)
+ else {
+ panic!("Should be published");
+ };
+
+ // Mimic a remote peer de-registering for notifications.
+ fake_gatt_server.incoming_client_configuration(
+ PeerId(1),
+ id,
+ AVAILABLE_AUDIO_CONTEXTS_HANDLE,
+ NotificationType::Disable,
+ );
+
+ // Update available context information.
+ let updated = server.update_available(AudioContexts {
+ sink: HashSet::from([ContextType::Alerts]),
+ source: HashSet::from([ContextType::Media, ContextType::Conversational]),
+ });
+ assert_eq!(
+ updated.sink,
+ AvailableContexts::Available(HashSet::from([ContextType::Alerts]))
+ );
+ assert_eq!(
+ updated.source,
+ AvailableContexts::Available(HashSet::from([ContextType::Media]))
+ );
+ // No event received since we haven't registered for notifications.
+ let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
+ panic!("No peer registered for notifications");
+ };
+
+ // Mimic a remote peer registering for notifications.
+ fake_gatt_server.incoming_client_configuration(
+ PeerId(1),
+ id,
+ AVAILABLE_AUDIO_CONTEXTS_HANDLE,
+ NotificationType::Notify,
+ );
+ // Poll on server to process client config request.
+ let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
+ panic!("Should be pending");
+ };
+ // Update available context information.
+ let updated = server.update_available(AudioContexts {
+ sink: HashSet::from([ContextType::Media, ContextType::Alerts]),
+ source: HashSet::from([ContextType::Media]),
+ });
+ assert_eq!(
+ updated.sink,
+ AvailableContexts::Available(HashSet::from([ContextType::Media, ContextType::Alerts]))
+ );
+ assert_eq!(
+ updated.source,
+ AvailableContexts::Available(HashSet::from([ContextType::Media]))
+ );
+ // Should have notified the updated value.
+ let Poll::Ready(Some(FakeServerEvent::Notified { service_id, handle, value, peers })) =
+ event_stream.poll_unpin(&mut noop_cx)
+ else {
+ panic!("Should be notified");
+ };
+ assert_eq!(service_id, id);
+ assert_eq!(handle, AVAILABLE_AUDIO_CONTEXTS_HANDLE);
+ assert_eq!(value, vec![0x04, 0x04, 0x04, 0x00]);
+ assert_eq!(peers, vec![PeerId(1)]);
+
+ // Update available with the same context information.
+ let _ = server.update_available(AudioContexts {
+ sink: HashSet::from([ContextType::Media, ContextType::Alerts]),
+ source: HashSet::from([ContextType::Media, ContextType::Conversational]),
+ });
+ // No event received since available context hasn't changed.
+ let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
+ panic!("Should not be notified");
+ };
+ }
}