rust/bt-battery: Subscribe to Battery Level characteristic notifications

- The battery level characteristic is a mandatory characteristic
  published by the battery service. Support for notifications on
  this characteristic are optional but commonly provided as it is
  the main mechanism for receiving "push" style updates of the
  battery level.
- Subscribe to notifications on the battery level characteristic and
  provide updates as a stream of battery events.

Test: cargo test

Change-Id: Ic0caa8d8a3ae7425af829f157237a84d69a2f436
Reviewed-on: https://bluetooth-review.git.corp.google.com/c/bluetooth/+/1700
Reviewed-by: Dayeong Lee <dayeonglee@google.com>
diff --git a/rust/bt-battery/src/error.rs b/rust/bt-battery/src/error.rs
index 0db99af..2998a5e 100644
--- a/rust/bt-battery/src/error.rs
+++ b/rust/bt-battery/src/error.rs
@@ -3,6 +3,7 @@
 // found in the LICENSE file.
 
 use bt_common::packet_encoding::Error as PacketError;
+use bt_gatt::types::Handle;
 use bt_gatt::types::{Error as GattLibraryError, GattError};
 use thiserror::Error;
 
@@ -24,7 +25,7 @@
     Packet(#[from] PacketError),
 
     #[error("Malformed service on peer: {0}")]
-    Service(ServiceError),
+    Service(#[from] ServiceError),
 
     #[error("Generic failure: {0}")]
     Generic(String),
@@ -38,4 +39,7 @@
 
     #[error("Notification streams unexpectedly terminated")]
     NotificationStreamClosed,
+
+    #[error("Notification received for an unsupported handle: {0:?}")]
+    UnsupportedHandle(Handle),
 }
diff --git a/rust/bt-battery/src/monitor/client.rs b/rust/bt-battery/src/monitor/client.rs
index 70ce184..04523d6 100644
--- a/rust/bt-battery/src/monitor/client.rs
+++ b/rust/bt-battery/src/monitor/client.rs
@@ -3,17 +3,103 @@
 // found in the LICENSE file.
 
 use bt_common::packet_encoding::Decodable;
-use bt_gatt::client::ServiceCharacteristic;
-use bt_gatt::types::Handle;
+use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic};
+use bt_gatt::types::{CharacteristicProperty, Error as GattLibraryError, Handle};
 use bt_gatt::GattTypes;
+use futures::stream::{BoxStream, FusedStream, SelectAll, Stream, StreamExt};
+use std::task::Poll;
 
 use crate::error::{Error, ServiceError};
 use crate::types::{BatteryLevel, BATTERY_LEVEL_UUID, READ_CHARACTERISTIC_BUFFER_SIZE};
 
-// TODO(aniramakri): Implement this.
-pub struct BatteryMonitorEventStream {}
+/// Represents the termination status of a Stream.
+#[derive(Clone, Copy, Debug, PartialEq, Default)]
+enum TerminatedState {
+    #[default]
+    NotTerminated,
+    /// Stream termination is in progress. We have returned an Err, but not a
+    /// None yet.
+    Terminating,
+    /// We have returned a None and therefore the stream is terminated. It
+    /// should not be polled again.
+    Terminated,
+}
+
+/// A stream of GATT notifications received from the peer's battery service
+/// characteristics. This stream must be polled in order to receive updates
+/// about the battery service.
+pub struct BatteryMonitorEventStream {
+    /// Notification streams received from the characteristics.
+    notification_streams:
+        SelectAll<BoxStream<'static, Result<CharacteristicNotification, GattLibraryError>>>,
+    /// The current termination status of the stream.
+    /// `TerminatedState::Terminated` when the stream is exhausted and
+    /// should not be polled thereafter.
+    terminated: TerminatedState,
+}
+
+impl BatteryMonitorEventStream {
+    fn new(
+        notification_streams: SelectAll<
+            BoxStream<'static, Result<CharacteristicNotification, GattLibraryError>>,
+        >,
+    ) -> Self {
+        Self { notification_streams, terminated: TerminatedState::default() }
+    }
+}
+
+impl Stream for BatteryMonitorEventStream {
+    // TODO(b/335259516): Update return type to accommodate other characteristics.
+    type Item = Result<BatteryLevel, Error>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        match &self.terminated {
+            TerminatedState::NotTerminated => {}
+            TerminatedState::Terminating => {
+                self.terminated = TerminatedState::Terminated;
+                return Poll::Ready(None);
+            }
+            TerminatedState::Terminated => {
+                panic!("polling terminated stream");
+            }
+        }
+
+        let result = futures::ready!(self.notification_streams.poll_next_unpin(cx));
+        match result {
+            Some(Ok(notification)) => {
+                let battery_level_result =
+                    BatteryLevel::decode(&notification.value[..]).map(|r| r.0).map_err(Into::into);
+                Poll::Ready(Some(battery_level_result))
+            }
+            Some(Err(e)) => {
+                // GATT Errors are not fatal and will be relayed to the stream.
+                // All other errors are considered fatal and will result in stream termination.
+                if !matches!(e, GattLibraryError::Gatt(_)) {
+                    self.terminated = TerminatedState::Terminating;
+                }
+                Poll::Ready(Some(Err(e.into())))
+            }
+            None => {
+                self.terminated = TerminatedState::Terminating;
+                Poll::Ready(Some(Err(Error::Service(ServiceError::NotificationStreamClosed))))
+            }
+        }
+    }
+}
+
+impl FusedStream for BatteryMonitorEventStream {
+    fn is_terminated(&self) -> bool {
+        self.terminated == TerminatedState::Terminated
+    }
+}
 
 /// Implements the Battery Service client role.
+/// Reads the relevant characteristics on a GATT connection to a remote peer's
+/// Battery Service and provides a mechanism for subscribing to and receiving
+/// notifications on the service.
 pub struct BatteryMonitorClient<T: GattTypes> {
     /// Represents the underlying GATT LE connection. Kept alive to maintain the
     /// connection to the peer.
@@ -23,10 +109,14 @@
     /// GATT Handles associated with the peer's one or more Battery Level
     /// characteristics. The first `Handle` in this list is expected to be
     /// the "primary" one.
+    // TODO(b/335259516): Save Handles for optional characteristics that are discovered.
     battery_level_handles: Vec<Handle>,
-    // TODO(b/335259516): Save Handles for additional characteristics that are discovered.
     /// The current battery level reported by the peer's battery server.
     battery_level: BatteryLevel,
+    /// Collection of streams containing notifications from the peer's battery
+    /// characteristics.
+    notification_streams:
+        Option<SelectAll<BoxStream<'static, Result<CharacteristicNotification, GattLibraryError>>>>,
 }
 
 impl<T: GattTypes> BatteryMonitorClient<T> {
@@ -47,32 +137,53 @@
         // It is valid to have multiple Battery Level Characteristics. If multiple
         // exist, the primary (main) characteristic has a Description field of
         // "main". For now, we assume the first such characteristic is the
-        // primary. See BAS 1.1 Section 3.1.2.1. TODO(b/335246946): Check for
-        // Characteristic Presentation Format descriptor if
+        // primary. See BAS 1.1 Section 3.1.2.1.
+        // TODO(b/335246946): Check for Characteristic Presentation Format descriptor if
         // multiple characteristics are present. Use this to infer the "primary".
+        let primary_battery_level_characteristic =
+            battery_level_characteristics.first().expect("nonempty");
         let battery_level_handles: Vec<Handle> =
             battery_level_characteristics.iter().map(|c| *c.handle()).collect();
-        // Get the current battery level of the primary characteristic.
+        // Get the current battery level of the primary Battery Level characteristic.
         let (battery_level, _decoded_bytes) = {
             let mut buf = vec![0; READ_CHARACTERISTIC_BUFFER_SIZE];
-            let read_bytes =
-                battery_level_characteristics.first().unwrap().read(&mut buf[..]).await?;
+            let read_bytes = primary_battery_level_characteristic.read(&mut buf[..]).await?;
             BatteryLevel::decode(&buf[0..read_bytes])?
         };
 
-        // TODO(aniramakri): Subscribe to notifications on the battery level
-        // characteristic and save as a stream of events.
+        // Subscribe to notifications on the battery level characteristic if it
+        // is supported.
+        let mut notification_streams = None;
+        if primary_battery_level_characteristic
+            .characteristic()
+            .supports_property(&CharacteristicProperty::Notify)
+        {
+            let mut streams = SelectAll::new();
+            streams
+                .push(gatt_client.subscribe(primary_battery_level_characteristic.handle()).boxed());
+            notification_streams = Some(streams);
+        }
+        // TODO(b/335259516): Subscribe to notifications from optional characteristics
+        // if they are present.
 
-        Ok(Self { _client, gatt_client, battery_level_handles, battery_level })
+        Ok(Self {
+            _client,
+            gatt_client,
+            battery_level_handles,
+            battery_level,
+            notification_streams,
+        })
     }
 
-    /// Returns a stream of battery events.
+    /// Returns a stream of battery events that represent notifications on the
+    /// remote peer's Battery Service.
     /// The returned Stream _must_ be polled in order to receive the relevant
-    /// notification and indications on the battery service.
-    /// This method should only be called once.
-    /// Returns Some<T> if the battery stream is available, None otherwise.
+    /// notification and indications.
+    /// This method should only be called once. Subsequent calls will return
+    /// None. Returns Some<T> if a stream of notifications is available,
+    /// None otherwise.
     pub fn take_event_stream(&mut self) -> Option<BatteryMonitorEventStream> {
-        todo!("Implement battery event stream")
+        self.notification_streams.take().map(|s| BatteryMonitorEventStream::new(s))
     }
 
     #[cfg(test)]
@@ -89,10 +200,9 @@
     use bt_common::Uuid;
     use bt_gatt::test_utils::{FakeClient, FakePeerService, FakeTypes};
     use bt_gatt::types::{
-        AttributePermissions, Characteristic, CharacteristicProperties, CharacteristicProperty,
+        AttributePermissions, Characteristic, CharacteristicProperties, GattError,
     };
     use futures::{pin_mut, FutureExt};
-    use std::task::Poll;
 
     pub(crate) const BATTERY_LEVEL_HANDLE: Handle = Handle(0x1);
     pub(crate) fn fake_battery_service(battery_level: u8) -> FakePeerService {
@@ -132,9 +242,39 @@
 
     #[test]
     fn create_client_and_read_battery_level_success() {
-        let battery_level = 20;
-        let (monitor, _fake_peer_service) = setup_client(battery_level);
-        assert_eq!(monitor.battery_level(), BatteryLevel(battery_level));
+        let initial_battery_level = 20;
+        let (monitor, _fake_peer_service) = setup_client(initial_battery_level);
+        assert_eq!(monitor.battery_level(), BatteryLevel(initial_battery_level));
+    }
+
+    #[test]
+    fn minimal_battery_service_has_no_stream() {
+        let mut fake_peer_service = FakePeerService::new();
+        fake_peer_service.add_characteristic(
+            Characteristic {
+                handle: BATTERY_LEVEL_HANDLE,
+                uuid: BATTERY_LEVEL_UUID,
+                properties: CharacteristicProperties(vec![
+                    CharacteristicProperty::Read, // Only read is mandatory
+                ]),
+                permissions: AttributePermissions::default(),
+                descriptors: vec![],
+            },
+            vec![10], // Initial battery level
+        );
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let create_result =
+            BatteryMonitorClient::<FakeTypes>::create(FakeClient::new(), fake_peer_service.clone());
+        pin_mut!(create_result);
+        let Poll::Ready(Ok(mut monitor)) = create_result.poll_unpin(&mut noop_cx) else {
+            panic!("Expected BatteryMonitorClient to be successfully created");
+        };
+        assert_eq!(monitor.battery_level(), BatteryLevel(10));
+
+        // No event stream should be available since the service does not
+        // support notifications on any characteristics.
+        assert!(monitor.take_event_stream().is_none());
     }
 
     #[test]
@@ -204,4 +344,104 @@
             panic!("Expected BatteryMonitorClient to be successfully created");
         };
     }
+
+    #[test]
+    fn battery_level_notifications() {
+        let initial_battery_level = 20;
+        let (mut monitor, fake_peer_service) = setup_client(initial_battery_level);
+
+        let mut notification_stream =
+            monitor.take_event_stream().expect("contains notification stream");
+        // Trying to grab it again should be handled gracefully and yield no stream.
+        assert!(monitor.take_event_stream().is_none());
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        assert!(notification_stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+        // Simulate a notification on the battery level characteristic.
+        let new_battery_level = 88;
+        let notification = CharacteristicNotification {
+            handle: BATTERY_LEVEL_HANDLE,
+            value: vec![new_battery_level],
+            maybe_truncated: false,
+        };
+        fake_peer_service.notify(&BATTERY_LEVEL_HANDLE, Ok(notification));
+        let Poll::Ready(Some(Ok(received_battery_level))) =
+            notification_stream.poll_next_unpin(&mut noop_cx)
+        else {
+            panic!("expected battery level change");
+        };
+        assert_eq!(received_battery_level, BatteryLevel(new_battery_level));
+
+        let new_battery_level2 = 76;
+        let notification2 = CharacteristicNotification {
+            handle: BATTERY_LEVEL_HANDLE,
+            value: vec![new_battery_level2],
+            maybe_truncated: false,
+        };
+        fake_peer_service.notify(&BATTERY_LEVEL_HANDLE, Ok(notification2));
+        let Poll::Ready(Some(Ok(received_battery_level2))) =
+            notification_stream.poll_next_unpin(&mut noop_cx)
+        else {
+            panic!("expected battery level change");
+        };
+        assert_eq!(received_battery_level2, BatteryLevel(new_battery_level2));
+
+        // A GATT error should still be propagated to the stream.
+        let error = GattError::UnlikelyError.into();
+        fake_peer_service.notify(&BATTERY_LEVEL_HANDLE, Err(error));
+        let Poll::Ready(Some(Err(Error::GattLibrary(_)))) =
+            notification_stream.poll_next_unpin(&mut noop_cx)
+        else {
+            panic!("expected GATT library error");
+        };
+        // Stream should still be active.
+        assert!(notification_stream.poll_next_unpin(&mut noop_cx).is_pending());
+        assert!(!notification_stream.is_terminated());
+    }
+
+    #[test]
+    fn notification_stream_error_terminates_event_stream() {
+        let (mut monitor, fake_peer_service) = setup_client(10);
+
+        let mut notification_stream =
+            monitor.take_event_stream().expect("contains notification stream");
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        assert!(notification_stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+        let error = GattLibraryError::Other("random error".into());
+        fake_peer_service.notify(&BATTERY_LEVEL_HANDLE, Err(error));
+        let Poll::Ready(Some(Err(Error::GattLibrary(_)))) =
+            notification_stream.poll_next_unpin(&mut noop_cx)
+        else {
+            panic!("expected GATT library error");
+        };
+        // The stream should be staged for shutdown since a fatal error was received.
+        let Poll::Ready(None) = notification_stream.poll_next_unpin(&mut noop_cx) else {
+            panic!("expected notification stream termination");
+        };
+        assert!(notification_stream.is_terminated());
+    }
+
+    #[test]
+    fn terminating_notification_stream_terminates_event_stream() {
+        let (mut monitor, fake_peer_service) = setup_client(10);
+
+        let mut notification_stream =
+            monitor.take_event_stream().expect("contains notification stream");
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        assert!(notification_stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+        fake_peer_service.clear_notifier(&BATTERY_LEVEL_HANDLE);
+        let Poll::Ready(Some(Err(Error::Service(_)))) =
+            notification_stream.poll_next_unpin(&mut noop_cx)
+        else {
+            panic!("expected service error");
+        };
+        // The stream should be staged for shutdown since there are no more active
+        // notification streams.
+        let Poll::Ready(None) = notification_stream.poll_next_unpin(&mut noop_cx) else {
+            panic!("expected notification stream termination");
+        };
+        assert!(notification_stream.is_terminated());
+    }
 }
diff --git a/rust/bt-gatt/src/client.rs b/rust/bt-gatt/src/client.rs
index 94af837..4f2ffb1 100644
--- a/rust/bt-gatt/src/client.rs
+++ b/rust/bt-gatt/src/client.rs
@@ -129,7 +129,7 @@
     /// or notifications. Errors are delivered through an Err item in the
     /// stream. This will often write to the Client Characteristic
     /// Configuration descriptor for the Characteristic subscribed to.
-    /// Updates sent from the peer wlil be delivered to the Stream returned.
+    /// Updates sent from the peer will be delivered to the Stream returned.
     fn subscribe(&self, handle: &Handle) -> T::NotificationStream;
 
     // TODO: Find included services
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index e155f8f..bc3d88f 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -64,6 +64,21 @@
         };
         char.1 = value;
     }
+
+    /// Sends a notification on the characteristic with the provided `handle`.
+    pub fn notify(&self, handle: &Handle, notification: Result<CharacteristicNotification>) {
+        let mut lock = self.inner.lock();
+        if let Some(notifier) = lock.notifiers.get_mut(handle) {
+            notifier.unbounded_send(notification).expect("can send notification");
+        }
+    }
+
+    /// Removes the notification subscription for the characteristic with the
+    /// provided `handle`.
+    pub fn clear_notifier(&self, handle: &Handle) {
+        let mut lock = self.inner.lock();
+        let _ = lock.notifiers.remove(handle);
+    }
 }
 
 impl crate::client::PeerService<FakeTypes> for FakePeerService {
diff --git a/rust/bt-gatt/src/types.rs b/rust/bt-gatt/src/types.rs
index fdc9cfd..2171070 100644
--- a/rust/bt-gatt/src/types.rs
+++ b/rust/bt-gatt/src/types.rs
@@ -339,7 +339,7 @@
 }
 
 /// A Characteristic on a Service. Each Characteristic has a declaration, value,
-/// and zero or more decriptors.
+/// and zero or more descriptors.
 #[derive(Clone, Debug)]
 pub struct Characteristic {
     pub handle: Handle,
@@ -355,6 +355,10 @@
         &self.properties
     }
 
+    pub fn supports_property(&self, property: &CharacteristicProperty) -> bool {
+        self.properties.0.contains(property)
+    }
+
     pub fn descriptors(&self) -> impl Iterator<Item = &Descriptor> {
         self.descriptors.iter()
     }