rust/bt-pacs: Implement notification for available audio contexts

Available audio contexts characteristic should support notify property:
  - Add a method on PACS server to allow the clients of the library to
    update the available audio contexts.
  - Notify the change through the GATT local service.

Bug: b/309015071
Test: cargo test
Change-Id: Ib0b41f152dcb8148410ac7275cd36d153aec08a0
Reviewed-on: https://bluetooth-review.googlesource.com/c/bluetooth/+/2020
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index 3397799..148e666 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -7,7 +7,7 @@
 use futures::future::{ready, Ready};
 use futures::stream::{FusedStream, Stream};
 use parking_lot::Mutex;
-use std::collections::{HashMap, VecDeque};
+use std::collections::{HashMap, HashSet, VecDeque};
 use std::sync::Arc;
 use std::task::{Poll, Waker};
 
@@ -18,7 +18,9 @@
 
 use crate::periodic_advertising::{PeriodicAdvertising, SyncReport};
 use crate::pii::GetPeerAddr;
-use crate::server::{self, LocalService, ReadResponder, ServiceDefinition, WriteResponder};
+use crate::server::{
+    self, LocalService, NotificationType, ReadResponder, ServiceDefinition, WriteResponder,
+};
 use crate::{types::*, GattTypes, ServerTypes};
 
 #[derive(Default)]
@@ -427,6 +429,8 @@
     service_senders:
         HashMap<server::ServiceId, UnboundedSender<Result<server::ServiceEvent<FakeTypes>>>>,
     sender: UnboundedSender<FakeServerEvent>,
+    notification_peers: HashSet<PeerId>,
+    indication_peers: HashSet<PeerId>,
 }
 
 #[derive(Clone, Debug)]
@@ -454,6 +458,8 @@
                     services: Default::default(),
                     service_senders: Default::default(),
                     sender,
+                    notification_peers: HashSet::new(),
+                    indication_peers: HashSet::new(),
                 })),
             },
             receiver,
@@ -511,6 +517,38 @@
             }))
             .unwrap();
     }
+
+    pub fn incoming_client_configuration(
+        &self,
+        peer_id: PeerId,
+        id: server::ServiceId,
+        handle: Handle,
+        notification_type: NotificationType,
+    ) {
+        let mut inner = self.inner.lock();
+        match notification_type {
+            NotificationType::Notify => {
+                inner.notification_peers.insert(peer_id);
+            }
+            NotificationType::Indicate => {
+                inner.indication_peers.insert(peer_id);
+            }
+            NotificationType::Disable => {
+                inner.notification_peers.remove(&peer_id);
+                inner.indication_peers.remove(&peer_id);
+            }
+        }
+        inner
+            .service_senders
+            .get(&id)
+            .unwrap()
+            .unbounded_send(Ok(server::ServiceEvent::ClientConfiguration {
+                peer_id,
+                handle,
+                notification_type,
+            }))
+            .unwrap();
+    }
 }
 
 pub struct FakeLocalService {
@@ -544,16 +582,24 @@
     }
 
     fn notify(&self, characteristic: &Handle, data: &[u8], peers: &[PeerId]) {
-        self.inner
-            .lock()
-            .sender
-            .unbounded_send(FakeServerEvent::Notified {
-                service_id: self.id,
-                handle: *characteristic,
-                value: data.into(),
-                peers: peers.into(),
-            })
-            .unwrap();
+        let inner = self.inner.lock();
+        let peers_to_notify: HashSet<_> = if peers.is_empty() {
+            inner.notification_peers.clone()
+        } else {
+            peers.iter().filter(|p| inner.notification_peers.contains(p)).cloned().collect()
+        };
+
+        if !peers_to_notify.is_empty() {
+            inner
+                .sender
+                .unbounded_send(FakeServerEvent::Notified {
+                    service_id: self.id,
+                    handle: *characteristic,
+                    value: data.into(),
+                    peers: peers_to_notify.into_iter().collect(),
+                })
+                .unwrap();
+        }
     }
 
     fn indicate(
@@ -563,17 +609,25 @@
         peers: &[PeerId],
     ) -> <FakeTypes as ServerTypes>::IndicateConfirmationStream {
         let (sender, receiver) = futures::channel::mpsc::unbounded();
-        self.inner
-            .lock()
-            .sender
-            .unbounded_send(FakeServerEvent::Indicated {
-                service_id: self.id,
-                handle: *characteristic,
-                value: data.into(),
-                peers: peers.into(),
-                confirmations: sender,
-            })
-            .unwrap();
+        let inner = self.inner.lock();
+        let peers_to_indicate: HashSet<_> = if peers.is_empty() {
+            inner.indication_peers.clone()
+        } else {
+            peers.iter().filter(|p| inner.indication_peers.contains(p)).cloned().collect()
+        };
+
+        if !peers_to_indicate.is_empty() {
+            inner
+                .sender
+                .unbounded_send(FakeServerEvent::Indicated {
+                    service_id: self.id,
+                    handle: *characteristic,
+                    value: data.into(),
+                    peers: peers_to_indicate.into_iter().collect(),
+                    confirmations: sender,
+                })
+                .unwrap();
+        }
         receiver
     }
 }
diff --git a/rust/bt-gatt/src/types.rs b/rust/bt-gatt/src/types.rs
index c9b72d3..c65d05b 100644
--- a/rust/bt-gatt/src/types.rs
+++ b/rust/bt-gatt/src/types.rs
@@ -15,6 +15,7 @@
     WriteNotPermitted = 3,
     InvalidPdu = 4,
     InsufficientAuthentication = 5,
+    RequestNotSupported = 6,
     InvalidOffset = 7,
     InsufficientAuthorization = 8,
     InsufficientEncryptionKeySize = 12,
@@ -81,6 +82,7 @@
             3 => Ok(Self::WriteNotPermitted),
             4 => Ok(Self::InvalidPdu),
             5 => Ok(Self::InsufficientAuthentication),
+            6 => Ok(Self::RequestNotSupported),
             7 => Ok(Self::InvalidOffset),
             8 => Ok(Self::InsufficientAuthorization),
             12 => Ok(Self::InsufficientEncryptionKeySize),
diff --git a/rust/bt-pacs/Cargo.toml b/rust/bt-pacs/Cargo.toml
index dd975d8..c260971 100644
--- a/rust/bt-pacs/Cargo.toml
+++ b/rust/bt-pacs/Cargo.toml
@@ -14,6 +14,7 @@
 futures.workspace = true
 pin-project.workspace = true
 thiserror.workspace = true
+parking_lot.workspace = true
 
 [dev-dependencies]
 bt-gatt = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-pacs/src/lib.rs b/rust/bt-pacs/src/lib.rs
index fa05dc2..7d65629 100644
--- a/rust/bt-pacs/src/lib.rs
+++ b/rust/bt-pacs/src/lib.rs
@@ -303,6 +303,17 @@
     Available(HashSet<ContextType>),
 }
 
+impl FromIterator<ContextType> for AvailableContexts {
+    fn from_iter<I: IntoIterator<Item = ContextType>>(iter: I) -> Self {
+        let types_: HashSet<_> = iter.into_iter().collect();
+        if types_.len() == 0 {
+            AvailableContexts::NotAvailable
+        } else {
+            AvailableContexts::Available(types_)
+        }
+    }
+}
+
 impl Decodable for AvailableContexts {
     type Error = bt_common::packet_encoding::Error;
 
diff --git a/rust/bt-pacs/src/server.rs b/rust/bt-pacs/src/server.rs
index c3bfa1a..8987e17 100644
--- a/rust/bt-pacs/src/server.rs
+++ b/rust/bt-pacs/src/server.rs
@@ -34,6 +34,7 @@
 use bt_gatt::server::{ReadResponder, ServiceDefinition, WriteResponder};
 use bt_gatt::types::{GattError, Handle};
 use bt_gatt::Server as _;
+
 use futures::task::{Poll, Waker};
 use futures::{Future, Stream};
 use pin_project::pin_project;
@@ -41,8 +42,8 @@
 use thiserror::Error;
 
 use crate::{
-    AudioLocations, AvailableAudioContexts, PacRecord, SinkAudioLocations, SourceAudioLocations,
-    SupportedAudioContexts,
+    AudioLocations, AvailableAudioContexts, AvailableContexts, PacRecord, SinkAudioLocations,
+    SourceAudioLocations, SupportedAudioContexts,
 };
 
 pub(crate) mod types;
@@ -336,6 +337,38 @@
     fn is_sink_locations_handle(&self, handle: Handle) -> bool {
         self.sink_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
     }
+
+    /// Updates available audio contexts by overwritting the previously
+    /// set available audio contexts value. If a context type is not
+    /// part of the supported audio contexts, it will be ignored and
+    /// the resulting available audio contexts would not contain that
+    /// particular type.
+    /// Returns the reference to the resulting available audio context.
+    pub fn update_available(&mut self, available: AudioContexts) -> &AvailableAudioContexts {
+        let supported = &self.supported_audio_contexts;
+        let new_sink =
+            AvailableContexts::from_iter(available.sink.intersection(&supported.sink).cloned());
+        let new_source =
+            AvailableContexts::from_iter(available.source.intersection(&supported.source).cloned());
+
+        let changed = self.available_audio_contexts.sink != new_sink
+            || self.available_audio_contexts.source != new_source;
+
+        self.available_audio_contexts.sink = new_sink;
+        self.available_audio_contexts.source = new_source;
+
+        if !changed {
+            return &self.available_audio_contexts;
+        }
+        if let LocalServiceState::Published { service, .. } = &self.local_service {
+            service.notify(
+                &AVAILABLE_AUDIO_CONTEXTS_HANDLE,
+                &self.available_audio_contexts.into_char_value(),
+                &[],
+            );
+        }
+        &self.available_audio_contexts
+    }
 }
 
 impl<T: bt_gatt::ServerTypes> Stream for Server<T> {
@@ -381,15 +414,13 @@
                     responder.respond(&value[offset..]);
                     continue;
                 }
-                // TODO(b/309015071): support optional writes.
+                // TODO(b/335293412): support optional writes.
                 Write { responder, .. } => {
                     responder.error(GattError::WriteNotPermitted);
                     continue;
                 }
-                // TODO(b/309015071): implement notify since it's mandatory.
-                ClientConfiguration { .. } => {
-                    unimplemented!();
-                }
+                // We don't do anything complex with ClientConfiguration for now.
+                ClientConfiguration { .. } => continue,
                 _ => continue,
             }
         }
@@ -405,10 +436,11 @@
     use bt_common::generic_audio::AudioLocation;
     use bt_common::PeerId;
     use bt_gatt::server;
+    use bt_gatt::server::NotificationType;
     use bt_gatt::test_utils::{FakeServer, FakeServerEvent, FakeTypes};
     use bt_gatt::types::ServiceKind;
-    use futures::{FutureExt, StreamExt};
 
+    use futures::{FutureExt, StreamExt};
     use std::collections::HashSet;
 
     use crate::AvailableContexts;
@@ -417,7 +449,7 @@
     // - 1 sink and 1 source PAC characteristics
     // - sink audio locations
     fn default_server_builder() -> ServerBuilder {
-        let builder = ServerBuilder::new()
+        ServerBuilder::new()
             .add_sink(vec![PacRecord {
                 codec_id: CodecId::Assigned(CodingFormat::ALawLog),
                 codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
@@ -444,8 +476,7 @@
                     metadata: vec![],
                 },
             ])
-            .add_source(vec![]);
-        builder
+            .add_source(vec![])
     }
 
     #[test]
@@ -634,4 +665,106 @@
         assert_eq!(handle, available_char_handle);
         assert_eq!(value.expect("should be ok"), vec![0x04, 0x00, 0x00, 0x00]);
     }
+
+    #[test]
+    fn update_available() {
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+        let mut server = default_server_builder()
+            .build::<FakeTypes>(
+                AudioContexts::new(
+                    HashSet::from([ContextType::Media, ContextType::Alerts]),
+                    HashSet::from([ContextType::Media]),
+                ),
+                AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
+            )
+            .unwrap();
+
+        let (fake_gatt_server, mut event_receiver) = FakeServer::new();
+        let _ = server.publish(fake_gatt_server.clone()).expect("should succeed");
+
+        // Server should poll on local server state.
+        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
+            panic!("Should be pending");
+        };
+
+        // Should receive event that GATT service was published.
+        let mut event_stream = event_receiver.next();
+        let Poll::Ready(Some(FakeServerEvent::Published { id, .. })) =
+            event_stream.poll_unpin(&mut noop_cx)
+        else {
+            panic!("Should be published");
+        };
+
+        // Mimic a remote peer de-registering for notifications.
+        fake_gatt_server.incoming_client_configuration(
+            PeerId(1),
+            id,
+            AVAILABLE_AUDIO_CONTEXTS_HANDLE,
+            NotificationType::Disable,
+        );
+
+        // Update available context information.
+        let updated = server.update_available(AudioContexts {
+            sink: HashSet::from([ContextType::Alerts]),
+            source: HashSet::from([ContextType::Media, ContextType::Conversational]),
+        });
+        assert_eq!(
+            updated.sink,
+            AvailableContexts::Available(HashSet::from([ContextType::Alerts]))
+        );
+        assert_eq!(
+            updated.source,
+            AvailableContexts::Available(HashSet::from([ContextType::Media]))
+        );
+        // No event received since we haven't registered for notifications.
+        let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
+            panic!("No peer registered for notifications");
+        };
+
+        // Mimic a remote peer registering for notifications.
+        fake_gatt_server.incoming_client_configuration(
+            PeerId(1),
+            id,
+            AVAILABLE_AUDIO_CONTEXTS_HANDLE,
+            NotificationType::Notify,
+        );
+        // Poll on server to process client config request.
+        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
+            panic!("Should be pending");
+        };
+        // Update available context information.
+        let updated = server.update_available(AudioContexts {
+            sink: HashSet::from([ContextType::Media, ContextType::Alerts]),
+            source: HashSet::from([ContextType::Media]),
+        });
+        assert_eq!(
+            updated.sink,
+            AvailableContexts::Available(HashSet::from([ContextType::Media, ContextType::Alerts]))
+        );
+        assert_eq!(
+            updated.source,
+            AvailableContexts::Available(HashSet::from([ContextType::Media]))
+        );
+        // Should have notified the updated value.
+        let Poll::Ready(Some(FakeServerEvent::Notified { service_id, handle, value, peers })) =
+            event_stream.poll_unpin(&mut noop_cx)
+        else {
+            panic!("Should be notified");
+        };
+        assert_eq!(service_id, id);
+        assert_eq!(handle, AVAILABLE_AUDIO_CONTEXTS_HANDLE);
+        assert_eq!(value, vec![0x04, 0x04, 0x04, 0x00]);
+        assert_eq!(peers, vec![PeerId(1)]);
+
+        // Update available with the same context information.
+        let _ = server.update_available(AudioContexts {
+            sink: HashSet::from([ContextType::Media, ContextType::Alerts]),
+            source: HashSet::from([ContextType::Media, ContextType::Conversational]),
+        });
+        // No event received since available context hasn't changed.
+        let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
+            panic!("Should not be notified");
+        };
+    }
 }