[rust][le][bass] Define client for Broadcast Audio Scan Service

When the client is created, use the GATT client to discover all the
required characteristics and register for notifications for receive
state characteristics.

Bug: b/308483171
Change-Id: I09d9aa961587cceeac7e6f4f8d96e1c56da89bfb
Reviewed-on: https://bluetooth-review.git.corp.google.com/c/bluetooth/+/1200
Reviewed-by: Marie Janssen <jamuraa@google.com>
diff --git a/rust/bt-bass/Cargo.toml b/rust/bt-bass/Cargo.toml
index 15d0ece..b4af24c 100644
--- a/rust/bt-bass/Cargo.toml
+++ b/rust/bt-bass/Cargo.toml
@@ -4,8 +4,9 @@
 edition = "2021"
 license = "BSD-2-Clause"
 
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
 [dependencies]
 bt-common = { path = "../bt-common" }
-thiserror = "1.0"
\ No newline at end of file
+bt-gatt = { path = "../bt-gatt", features = ["test-utils"] }
+futures = "0.3.28"
+thiserror = "1.0"
+tracing = "0.1.40"
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
new file mode 100644
index 0000000..d10e6da
--- /dev/null
+++ b/rust/bt-bass/src/client.rs
@@ -0,0 +1,244 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+pub mod error;
+
+use std::collections::HashMap;
+use std::time::SystemTime;
+use std::sync::{Arc, RwLock};
+
+use futures::Stream;
+use tracing::warn;
+
+use bt_common::packet_encoding::Decodable;
+use bt_gatt::types::{Handle, Result as GattResult};
+use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic};
+
+use crate::client::error::{Error, ServiceError};
+use crate::types::*;
+
+const READ_CHARACTERISTIC_BUFFER_SIZE: usize = 255;
+
+#[allow(dead_code)]
+type ReceiveStateReadValue = (BroadcastReceiveState, SystemTime);
+
+/// Manages connection to the Broadcast Audio Scan Service at the
+/// remote Scan Delegator and writes/reads characteristics to/from it.
+#[allow(dead_code)]
+pub struct BroadcastAudioScanServiceClient<PeerServiceT, GattNotificationStream> {
+    gatt_client: Box<PeerServiceT>,
+    /// Broadcast Audio Scan Service only has one Broadcast Audio Scan Control Point characteristic
+    /// according to BASS Section 3. There shall
+    /// be one or more Broadcast Receive State characteristics.
+    audio_scan_control_point: Handle,
+    /// Broadcast Receive State characteristics can be used to determine the BASS status.
+    receive_states: Arc<RwLock<HashMap<Handle, Option<ReceiveStateReadValue>>>>,
+    // List of characteristic notification streams we are listening to.
+    notification_streams: Vec<GattNotificationStream>,
+}
+
+#[allow(dead_code)]
+impl<GattNotificationStream: Stream<Item = GattResult<CharacteristicNotification>>, PeerServiceT: PeerService<NotificationStream = GattNotificationStream>> BroadcastAudioScanServiceClient<PeerServiceT, GattNotificationStream> {
+    async fn create(gatt_client: Box<PeerServiceT>) -> Result<Self, Error> {
+        let characteristics = Self::discover_all_characteristics(&gatt_client).await?;
+        let mut client = Self {
+            gatt_client,
+            audio_scan_control_point: characteristics.0,
+            receive_states: Arc::new(RwLock::new(characteristics.1)),
+            notification_streams: Vec::new(),
+        };
+        client.register_notification().await?;
+        Ok(client)
+    }
+
+    // Discover all the characteristics that are required.
+    // On success, returns the tuple of (Broadcast Audio Scan Control Point Characteristic, HashMap of all Broadcast Received State Characteristics).
+    async fn discover_all_characteristics(gatt_client: &PeerServiceT) -> Result<(Handle, HashMap<Handle, Option<ReceiveStateReadValue>>), Error> {
+        let bascp = ServiceCharacteristic::find(gatt_client, BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID).await.map_err(|e| Error::Gatt(e))?;
+        let brs = ServiceCharacteristic::find(gatt_client, BROADCAST_RECEIVE_STATE_UUID).await.map_err(|e| Error::Gatt(e))?;
+        if bascp.len() == 0 || brs.len() == 0 {
+            return Err(Error::Service(ServiceError::MissingCharacteristic));
+        }
+        if bascp.len() > 1 {
+            return Err(Error::Service(ServiceError::ExtraScanControlPointCharacteristic));
+        }
+        // Discover all characteristics at the BASS at the remote peer over the GATT connection. Service should have a single Broadcast Audio Scan Control Point Characteristic
+        let mut brs_map = HashMap::new();
+        for c in brs {
+            // Read the value of the Broadcast Recieve State at the time of discovery for record.
+            let mut buf = vec![0; READ_CHARACTERISTIC_BUFFER_SIZE];
+            match c.read(&mut buf[..]).await {
+                Ok(read_bytes) => match BroadcastReceiveState::decode(&buf[0..read_bytes]) {
+                    Ok(decoded) => {
+                        brs_map.insert(*c.handle(), Some((decoded.0, SystemTime::now())));
+                        continue;
+                    },
+                    Err(e) => warn!("Failed to decode characteristic ({:?}) to Broadcast Receive State value: {:?}", *c.handle(), e),
+                },
+                Err(e) => warn!("Failed to read characteristic ({:?}) value: {:?}", *c.handle(), e),
+            }
+            brs_map.insert(*c.handle(), None);
+        }
+        Ok((*bascp[0].handle(), brs_map))
+    }
+
+    /// Registers for notifications for Broadcast Receive State characteristics.
+    async fn register_notification(&mut self) -> Result<(), Error> {
+        // Notification is mandatory for Receive State as per BASS v1.0 Section 3.2.1.
+        let lock = self.receive_states.write().unwrap();
+        for handle in lock.keys() {
+            let stream = self.gatt_client.subscribe(handle);
+            self.notification_streams.push(stream);
+        }
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::task::Poll;
+
+    use futures::{FutureExt, pin_mut};
+
+    use bt_common::Uuid;
+    use bt_gatt::Characteristic;
+    use bt_gatt::types::{AttributePermissions, CharacteristicProperty, CharacteristicProperties, Handle};
+    use bt_gatt::test_utils::*;
+
+    #[test]
+    fn create_client() {
+        let mut fake_peer_service = FakePeerService::new();
+        // Add 3 Broadcast Receive State Characteristics, 1 Broadcast Audio Scan Control Point Characteristic,
+        // and 1 random one.
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(1),
+            uuid: BROADCAST_RECEIVE_STATE_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(2),
+            uuid: BROADCAST_RECEIVE_STATE_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(3),
+            uuid: BROADCAST_RECEIVE_STATE_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(4),
+            uuid: Uuid::from_u16(0x1234),
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Notify]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(5),
+            uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let create_result = BroadcastAudioScanServiceClient::create(Box::new(fake_peer_service.clone()));
+        pin_mut!(create_result);
+        let polled = create_result.poll_unpin(&mut noop_cx);
+        let Poll::Ready(Ok(client)) = polled else {
+            panic!("Expected BASSClient to be succesfully created");
+};
+
+        // Check that all the characteristics have been discovered.
+        assert_eq!(client.audio_scan_control_point, Handle(5));
+        let broadcast_receive_states = client.receive_states.read().unwrap();
+        assert!(broadcast_receive_states.contains_key(&Handle(1)));
+        assert!(broadcast_receive_states.contains_key(&Handle(2)));
+        assert!(broadcast_receive_states.contains_key(&Handle(3)));
+
+        // Three notification one for each of broadcast receive state characteristic should have been set up.
+        assert_eq!(client.notification_streams.len(), 3);
+    }
+
+    #[test]
+    fn create_client_fails_missing_characteristics() {
+        // Missing scan control point characteristic.
+        let mut fake_peer_service = FakePeerService::new();
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(1),
+            uuid: BROADCAST_RECEIVE_STATE_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let create_result = BroadcastAudioScanServiceClient::create(Box::new(fake_peer_service.clone()));
+        pin_mut!(create_result);
+        let polled = create_result.poll_unpin(&mut noop_cx);
+        let Poll::Ready(Err(_)) = polled else {
+            panic!("Expected BASSClient to have failed");
+        };
+
+        // Missing receive state characteristic.
+        let mut fake_peer_service: FakePeerService = FakePeerService::new();
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(1),
+            uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let create_result = BroadcastAudioScanServiceClient::create(Box::new(fake_peer_service.clone()));
+        pin_mut!(create_result);
+        let polled = create_result.poll_unpin(&mut noop_cx);
+        let Poll::Ready(Err(_)) = polled else {
+            panic!("Expected BASSClient to have failed");
+        };
+    }
+
+    #[test]
+    fn create_client_fails_duplicate_characteristics() {
+        // More than one scan control point characteristics.
+        let mut fake_peer_service = FakePeerService::new();
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(1),
+            uuid: BROADCAST_RECEIVE_STATE_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(2),
+            uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+        fake_peer_service.add_characteristic(Characteristic {
+            handle: Handle(3),
+            uuid: BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
+            properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
+            permissions: AttributePermissions::default(),
+            descriptors: vec![],
+        }, vec![]);
+
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+        let create_result = BroadcastAudioScanServiceClient::create(Box::new(fake_peer_service.clone()));
+        pin_mut!(create_result);
+        let polled = create_result.poll_unpin(&mut noop_cx);
+        let Poll::Ready(Err(_)) = polled else {
+            panic!("Expected BASSClient to have failed");
+        };
+    }
+}
diff --git a/rust/bt-bass/src/client/error.rs b/rust/bt-bass/src/client/error.rs
new file mode 100644
index 0000000..dab1378
--- /dev/null
+++ b/rust/bt-bass/src/client/error.rs
@@ -0,0 +1,43 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use thiserror::Error;
+
+use bt_common::packet_encoding::Error as PacketError;
+use bt_gatt::types::{Error as BTGattError, Handle};
+
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error("An unsupported opcode ({0:#x}) used in a Broadcast Audio Scan Control Point operation")]
+    OpCodeNotSupported(u8),
+
+    #[error("Invalid source id ({0:#x}) used in a Broadcast Audio Scan Control Point operation")]
+    InvalidSourceId(u8),
+
+    #[error("Packet serialization/deserialization error: {0}")]
+    Packet(PacketError),
+
+    #[error("Malformed service on peer: {0}")]
+    Service(ServiceError),
+
+    #[error("GATT operation error: {0}")]
+    Gatt(BTGattError),
+
+    #[error("Failure occurred: {0}")]
+    Generic(String),
+}
+
+/// This error represents an error we found at the remote service
+/// which prevents us from proceeding further with the client operation.
+#[derive(Debug, Error, PartialEq)]
+pub enum ServiceError {
+    #[error("Missing a required service characteristic")]
+    MissingCharacteristic,
+
+    #[error("More than one Broadcast Audio Scan Control Point characteristics")]
+    ExtraScanControlPointCharacteristic,
+
+    #[error("Failed to configure notification for Broadcast Recieve State characteristic (handle={0:?})")]
+    NotificationConfig(Handle),
+}
diff --git a/rust/bt-bass/src/lib.rs b/rust/bt-bass/src/lib.rs
index b77366b..b97638d 100644
--- a/rust/bt-bass/src/lib.rs
+++ b/rust/bt-bass/src/lib.rs
@@ -2,4 +2,5 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+pub mod client;
 pub mod types;
diff --git a/rust/bt-bass/src/types.rs b/rust/bt-bass/src/types.rs
index 33dcb5f..fc7827f 100644
--- a/rust/bt-bass/src/types.rs
+++ b/rust/bt-bass/src/types.rs
@@ -2,6 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+use bt_common::Uuid;
 use bt_common::packet_encoding::{Decodable, Encodable, Error as PacketError};
 use bt_common::generic_audio::metadata_ltv::*;
 use bt_common::core::{AdvertisingSetId, PaInterval};
@@ -11,7 +12,11 @@
 const PA_SYNC_BYTE_SIZE: usize = 1;
 const SOURCE_ID_BYTE_SIZE: usize = 1;
 
-type SourceId = u8;
+/// 16-bit UUID value for the characteristics offered by the Broadcast Audio Scan Service.
+pub const BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID: Uuid = Uuid::from_u16(0x2BC7);
+pub const BROADCAST_RECEIVE_STATE_UUID: Uuid = Uuid::from_u16(0x2BC8);
+
+pub type SourceId = u8;
 
 /// Broadcast_ID is a 3-byte data on the wire.
 /// See Broadcast Audio Scan Service (BASS) spec v1.0 Table 3.5 for details.
@@ -552,7 +557,7 @@
     // 0x00000000: 0b0 = Do not synchronize to BIS_index[x]
     // 0xxxxxxxxx: 0b1 = Synchronize to BIS_index[x]
     // 0xFFFFFFFF: means No preference if used in BroadcastAudioScanControlPoint,
-    //             Failed to sync if used in BroadcastReceiveState.
+    //             Failed to sync if used in ReceiveState.
     bis_sync_bitfield: u32,
     metadata: Metadata,
 }
@@ -665,9 +670,47 @@
 /// Broadcast Receive State characteristic as defined in
 /// Broadcast Audio Scan Service spec v1.0 Section 3.2.
 /// The Broadcast Receive State characteristic is used by the server to expose information
-/// about a Broadcast Source.
+/// about a Broadcast Source. If the server has not written a Source_ID value to the
+/// Broadcast Receive State characteristic, the Broadcast Recieve State characteristic value
+/// shall be empty.
 #[derive(Debug, PartialEq)]
-pub struct BroadcastReceiveState {
+pub enum BroadcastReceiveState {
+    Empty,
+    NonEmpty(ReceiveState),
+}
+
+impl Decodable for BroadcastReceiveState {
+    type Error = PacketError;
+
+    fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+        if buf.len() == 0 {
+            return Ok((Self::Empty, 0));
+        }
+        let res = ReceiveState::decode(&buf[..])?;
+        Ok((Self::NonEmpty(res.0), res.1))
+    }
+}
+
+impl Encodable for BroadcastReceiveState {
+    type Error = PacketError;
+
+    fn encode(&self, buf: &mut [u8]) -> core::result::Result<(), Self::Error> {
+        match self {
+            Self::Empty => Ok(()),
+            Self::NonEmpty(state) => state.encode(&mut buf[..]),
+        }
+    }
+
+    fn encoded_len(&self) -> core::primitive::usize {
+        match self {
+            Self::Empty => 0,
+            Self::NonEmpty(state) => state.encoded_len(),
+        }
+    }
+}
+
+#[derive(Debug, PartialEq)]
+pub struct ReceiveState {
     source_id: SourceId,
     source_address_type: AddressType,
     // Address in little endian.
@@ -680,12 +723,12 @@
     subgroups: Vec<BigSubgroup>,
 }
 
-impl BroadcastReceiveState {
+impl ReceiveState {
     const MIN_PACKET_SIZE: usize = SOURCE_ID_BYTE_SIZE + AddressType::BYTE_SIZE + ADDRESS_BYTE_SIZE + AdvertisingSetId::BYTE_SIZE +
         BroadcastId::BYTE_SIZE + PA_SYNC_BYTE_SIZE + EncryptionStatus::MIN_PACKET_SIZE + NUM_SUBGROUPS_BYTE_SIZE;
 }
 
-impl Decodable for BroadcastReceiveState {
+impl Decodable for ReceiveState {
     type Error = PacketError;
 
     fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
@@ -718,7 +761,7 @@
             subgroups.push(subgroup);
             idx += consumed;
         }
-        Ok((BroadcastReceiveState{
+        Ok((ReceiveState{
             source_id,
             source_address_type,
             source_address,
@@ -731,7 +774,7 @@
     }
 }
 
-impl Encodable for BroadcastReceiveState {
+impl Encodable for ReceiveState {
     type Error = PacketError;
 
     fn encode(&self, buf: &mut [u8]) -> core::result::Result<(), Self::Error> {
@@ -1169,7 +1212,7 @@
     #[test]
     fn broadcast_receive_state_without_subgroups() {
         // Encoding.
-        let state = BroadcastReceiveState {
+        let state = BroadcastReceiveState::NonEmpty(ReceiveState {
             source_id: 0x01,
             source_address_type: AddressType::Public,
             source_address: [0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A],
@@ -1178,7 +1221,7 @@
             pa_sync_state: PaSyncState::Synced,
             big_encryption: EncryptionStatus::BadCode([0x16, 0x15, 0x14, 0x13, 0x12, 0x11, 0x10, 0x09, 0x08, 0x7, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01]),
             subgroups: vec![],
-        };
+        });
         assert_eq!(state.encoded_len(), 31);
         let mut buf = vec![0; state.encoded_len()];
         state.encode(&mut buf[..]).expect("should succeed");
@@ -1199,7 +1242,7 @@
     #[test]
     fn broadcast_receive_state_with_subgroups() {
         // Encoding
-        let state = BroadcastReceiveState {
+        let state = BroadcastReceiveState::NonEmpty(ReceiveState {
             source_id: 0x01,
             source_address_type: AddressType::Random,
             source_address: [0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A],
@@ -1210,7 +1253,7 @@
             subgroups: vec![
                 BigSubgroup::new(None).with_metadata(vec![Metadatum::ParentalRating(Rating::AllAge)]), // encoded_len = 8
             ],
-        };
+        });
         assert_eq!(state.encoded_len(), 23);
         let mut buf = vec![0; state.encoded_len()];
         state.encode(&mut buf[..]).expect("should succeed");
diff --git a/rust/bt-common/src/uuid.rs b/rust/bt-common/src/uuid.rs
index 0fa24da..a47f790 100644
--- a/rust/bt-common/src/uuid.rs
+++ b/rust/bt-common/src/uuid.rs
@@ -3,7 +3,7 @@
 // found in the LICENSE file.
 
 /// A Uuid as defined by the Core Specification (v5.4, Vol 3, Part B, Sec 2.5.1)
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
 pub struct Uuid(uuid::Uuid);
 
 impl Uuid {
diff --git a/rust/bt-gatt/Cargo.toml b/rust/bt-gatt/Cargo.toml
index b106aa4..930c7e9 100644
--- a/rust/bt-gatt/Cargo.toml
+++ b/rust/bt-gatt/Cargo.toml
@@ -4,9 +4,18 @@
 edition = "2021"
 license = "BSD-2-Clause"
 
+[features]
+default = []
+test-utils = []
+
 [dependencies]
 thiserror = "1.0.23"
 futures = "0.3.19"
+parking_lot = "0.12.1"
 
 ### in-tree dependencies
 bt-common = { path = "../bt-common" }
+
+[dev-dependencies]
+assert_matches = "1.5.0"
+bt-gatt = { path = ".", features = ["test-utils"] }
diff --git a/rust/bt-gatt/src/client/mod.rs b/rust/bt-gatt/src/client/mod.rs
index 9e31756..8415f69 100644
--- a/rust/bt-gatt/src/client/mod.rs
+++ b/rust/bt-gatt/src/client/mod.rs
@@ -49,6 +49,7 @@
     fn connect(&self) -> Self::ConnectFut;
 }
 
+#[derive(Debug)]
 pub struct CharacteristicNotification {
     pub handle: Handle,
     pub value: Vec<u8>,
diff --git a/rust/bt-gatt/src/lib.rs b/rust/bt-gatt/src/lib.rs
index 0eff602..6f7f263 100644
--- a/rust/bt-gatt/src/lib.rs
+++ b/rust/bt-gatt/src/lib.rs
@@ -14,5 +14,8 @@
 pub mod central;
 pub use central::Central;
 
+#[cfg(any(test, feature = "test-utils"))]
+pub mod test_utils;
+
 #[cfg(test)]
-mod tests;
+pub mod tests;
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
new file mode 100644
index 0000000..97393d2
--- /dev/null
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -0,0 +1,209 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use crate::{central::ScanResult, client::CharacteristicNotification, types::*};
+
+use bt_common::{PeerId, Uuid};
+use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
+use futures::{Future, Stream};
+use futures::future::{Ready, ready};
+use parking_lot::Mutex;
+use std::collections::HashMap;
+use std::task::Poll;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub(crate) struct FakeCentral {}
+
+#[derive(Default)]
+struct FakePeerServiceInner {
+    // Notifier that's used to send out notification.
+    notifiers: HashMap<Handle, UnboundedSender<Result<CharacteristicNotification>>>,
+
+    // Characteristics to return when `read_characteristic` and `discover_characteristics` are called.
+    characteristics: HashMap<Handle, (Characteristic, Vec<u8>)>,
+}
+
+#[derive(Clone)]
+pub struct FakePeerService {
+    inner: Arc<Mutex<FakePeerServiceInner>>,
+}
+
+impl FakePeerService {
+    pub fn new() -> Self {
+        Self { inner: Arc::new(Mutex::new(Default::default())) }
+    }
+
+    // Adds a characteristic so that it can be returned when discover/read method is
+    // called.
+    // Also triggers sending a characteristic value change notification to be sent.
+    pub fn add_characteristic(&mut self, char: Characteristic, value: Vec<u8>) {
+        let mut lock = self.inner.lock();
+        let handle = char.handle;
+        lock.characteristics.insert(handle, (char, value.clone()));
+        if let Some(notifier) = lock.notifiers.get_mut(&handle) {
+            notifier.unbounded_send(Ok(CharacteristicNotification { handle, value, maybe_truncated: false })).expect("should succeed");
+        }
+    }
+}
+
+impl crate::client::PeerService for FakePeerService {
+    type CharacteristicsFut = Ready<Result<Vec<Characteristic>>>;
+    type NotificationStream = UnboundedReceiver<Result<CharacteristicNotification>>;
+    type ReadFut<'a> = Ready<Result<(usize, bool)>>;
+    type WriteFut<'a> = Ready<Result<()>>;
+
+    fn discover_characteristics(&self, uuid: Option<Uuid>) -> Self::CharacteristicsFut {
+        let lock = self.inner.lock();
+        let mut result = Vec::new();
+        for (_handle, (char, _value))in &lock.characteristics {
+            match uuid {
+                Some(uuid) if uuid == char.uuid => result.push(char.clone()),
+                None => result.push(char.clone()),
+                _ => {},
+            }
+        }
+        ready(Ok(result))
+    }
+
+    fn read_characteristic<'a>(
+        &self,
+        handle: &Handle,
+        _offset: u16,
+        buf: &'a mut [u8],
+    ) -> Self::ReadFut<'a> {
+        let read_characteristics = &(*self.inner.lock()).characteristics;
+        let Some((_, value)) = read_characteristics.get(handle) else {
+            return ready(Err(Error::Gatt(GattError::InvalidHandle)));
+        };
+        buf[..value.len()].copy_from_slice(value.as_slice());
+        ready(Ok((value.len(), false)))
+    }
+
+    fn write_characteristic<'a>(
+        &self,
+        _handle: &Handle,
+        _mode: WriteMode,
+        _offset: u16,
+        _buf: &'a [u8],
+    ) -> Self::WriteFut<'a> {
+        todo!()
+    }
+
+    fn read_descriptor<'a>(
+        &self,
+        _handle: &Handle,
+        _offset: u16,
+        _buf: &'a mut [u8],
+    ) -> Self::ReadFut<'a> {
+        todo!()
+    }
+
+    fn write_descriptor<'a>(
+        &self,
+        _handle: &Handle,
+        _offset: u16,
+        _buf: &'a [u8],
+    ) -> Self::WriteFut<'a> {
+        todo!()
+    }
+
+    fn subscribe(&self, handle: &Handle) -> Self::NotificationStream {
+        let (sender, receiver) = unbounded();
+        (*self.inner.lock()).notifiers.insert(*handle, sender);
+        receiver
+    }
+}
+
+pub(crate) struct FakeServiceHandle {}
+
+impl crate::client::PeerServiceHandle for FakeServiceHandle {
+    type PeerServiceT = FakePeerService;
+    type ConnectFut = Ready<Result<Self::PeerServiceT>>;
+
+    fn uuid(&self) -> Uuid {
+        todo!()
+    }
+
+    fn is_primary(&self) -> bool {
+        todo!()
+    }
+
+    fn connect(&self) -> Self::ConnectFut {
+        todo!()
+    }
+}
+
+pub(crate) struct FakeClient {}
+
+impl crate::Client for FakeClient {
+    type PeerServiceHandleT = FakeServiceHandle;
+    type ServiceResultFut = Ready<Result<Vec<Self::PeerServiceHandleT>>>;
+
+    fn peer_id(&self) -> PeerId {
+        todo!()
+    }
+
+    fn find_service(&self, _uuid: Uuid) -> Self::ServiceResultFut {
+        todo!()
+    }
+}
+
+pub(crate) struct SingleResultStream {
+    result: Option<Result<crate::central::ScanResult>>,
+}
+
+impl Stream for SingleResultStream {
+    type Item = Result<crate::central::ScanResult>;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.result.is_some() {
+            Poll::Ready(self.get_mut().result.take())
+        } else {
+            // Never wake up, as if we never find another result
+            Poll::Pending
+        }
+    }
+}
+
+pub(crate) struct ClientConnectFut {}
+
+impl Future for ClientConnectFut {
+    type Output = Result<FakeClient>;
+
+    fn poll(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> Poll<Self::Output> {
+        todo!()
+    }
+}
+
+impl crate::Central for FakeCentral {
+    type ScanResultStream = SingleResultStream;
+
+    type Client = FakeClient;
+
+    type ClientFut = ClientConnectFut;
+
+    fn scan(&self, _filters: &[crate::central::ScanFilter]) -> Self::ScanResultStream {
+        SingleResultStream {
+            result: Some(Ok(ScanResult {
+                id: PeerId(1),
+                connectable: true,
+                name: crate::central::PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
+                advertised: vec![crate::central::AdvertisingDatum::Services(vec![
+                    Uuid::from_u16(0x1844),
+                ])],
+            })),
+        }
+    }
+
+    fn connect(&self, _peer_id: PeerId) -> Self::ClientFut {
+        todo!()
+    }
+}
diff --git a/rust/bt-gatt/src/tests.rs b/rust/bt-gatt/src/tests.rs
new file mode 100644
index 0000000..da13d2b
--- /dev/null
+++ b/rust/bt-gatt/src/tests.rs
@@ -0,0 +1,158 @@
+// Copyright 2023 Google LLC
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::task::Poll;
+
+use assert_matches::assert_matches;
+use futures::{FutureExt, StreamExt};
+
+use bt_common::Uuid;
+
+use crate::test_utils::*;
+use crate::types::*;
+use crate::{Central, central::Filter, client::PeerService};
+
+const TEST_UUID_1: Uuid = Uuid::from_u16(0x1234);
+const TEST_UUID_2: Uuid = Uuid::from_u16(0x2345);
+const TEST_UUID_3: Uuid = Uuid::from_u16(0x3456);
+
+// Sets up a fake peer service with some characteristics.
+fn set_up() -> FakePeerService {
+    let mut fake_peer_service = FakePeerService::new();
+    fake_peer_service.add_characteristic(Characteristic {
+        handle: Handle(1),
+        uuid: TEST_UUID_1,
+        properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+        permissions: AttributePermissions::default(),
+        descriptors: vec![],
+    }, vec![]);
+    fake_peer_service.add_characteristic(Characteristic {
+        handle: Handle(2),
+        uuid: TEST_UUID_1,
+        properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+        permissions: AttributePermissions::default(),
+        descriptors: vec![],
+    }, vec![]);
+    fake_peer_service.add_characteristic(Characteristic {
+        handle: Handle(3),
+        uuid: TEST_UUID_1,
+        properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+        permissions: AttributePermissions::default(),
+        descriptors: vec![],
+    }, vec![]);
+    fake_peer_service.add_characteristic(Characteristic {
+        handle: Handle(4),
+        uuid: TEST_UUID_2,
+        properties: CharacteristicProperties(vec![CharacteristicProperty::Notify]),
+        permissions: AttributePermissions::default(),
+        descriptors: vec![],
+    }, vec![]);
+    fake_peer_service.add_characteristic(Characteristic {
+        handle: Handle(5),
+        uuid: TEST_UUID_3,
+        properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
+        permissions: AttributePermissions::default(),
+        descriptors: vec![],
+    }, vec![]);
+    fake_peer_service
+}
+
+#[test]
+fn peer_service_discover_characteristics_works() {
+    let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+    let fake_peer_service = set_up();
+
+    let mut discover_results = fake_peer_service.discover_characteristics(Some(TEST_UUID_1));
+    let polled = discover_results.poll_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Ok(chars)) => { assert_eq!(chars.len(), 3) });
+
+    let mut discover_results = fake_peer_service.discover_characteristics(Some(TEST_UUID_2));
+    let polled = discover_results.poll_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Ok(chars)) => { assert_eq!(chars.len(), 1) });
+
+    let mut discover_results = fake_peer_service.discover_characteristics(Some(Uuid::from_u16(0xFF22)));
+    let polled = discover_results.poll_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Ok(chars)) => { assert_eq!(chars.len(), 0) });
+
+    let mut discover_results = fake_peer_service.discover_characteristics(None);
+    let polled = discover_results.poll_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Ok(chars)) => { assert_eq!(chars.len(), 5) });
+}
+
+#[test]
+fn peer_service_read_characteristic() {
+    let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+    let mut fake_peer_service = set_up();
+    let mut buf = vec![0; 255];
+
+    // For characteristic that was added, value is returned
+    let mut read_result = fake_peer_service.read_characteristic(&Handle(0x1), 0, &mut buf[..]);
+    let polled: Poll<std::prelude::v1::Result<(usize, bool), Error>> = read_result.poll_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Ok((len, _))) => { assert_eq!(len, 0) });
+
+    // For characteristic that doesn't exist, fails.
+    let mut read_result = fake_peer_service.read_characteristic(&Handle(0xF), 0, &mut buf[..]);
+    let polled = read_result.poll_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Err(_)));
+
+    // Change the value for characteristic with handle 1.
+    fake_peer_service.add_characteristic(Characteristic {
+        handle: Handle(1),
+        uuid: TEST_UUID_1,
+        properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+        permissions: AttributePermissions::default(),
+        descriptors: vec![],
+    }, vec![0,1,2,3]);
+
+    // Successfully reads the updated value.
+    let mut read_result = fake_peer_service.read_characteristic(&Handle(0x1), 0, &mut buf[..]);
+    let polled = read_result.poll_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Ok((len, _))) => {
+        assert_eq!(len, 4);
+        assert_eq!(buf[..len], vec![0,1,2,3]);
+    });
+}
+
+#[test]
+fn peer_service_subsribe() {
+    let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+    let mut fake_peer_service = set_up();
+    let mut notification_stream = fake_peer_service.subscribe(&Handle(0x1));
+
+    // Stream is empty unless we add an item through the FakeNotificationStream struct.
+    assert!(notification_stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+    // Update the characteristic value so that notification is sent.
+    fake_peer_service.add_characteristic(Characteristic {
+        handle: Handle(1),
+        uuid: TEST_UUID_1,
+        properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast, CharacteristicProperty::Notify]),
+        permissions: AttributePermissions::default(),
+        descriptors: vec![],
+    }, vec![0,1,2,3]);
+
+    // Stream should be ready.
+    let polled = notification_stream.poll_next_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Some(Ok(notification))) => {
+        assert_eq!(notification.handle, Handle(0x1));
+        assert_eq!(notification.value, vec![0,1,2,3]);
+        assert_eq!(notification.maybe_truncated, false);
+    });
+
+    assert!(notification_stream.poll_next_unpin(&mut noop_cx).is_pending());
+}
+
+#[test]
+fn central_search_works() {
+    let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+    let central = FakeCentral::default();
+
+    let mut scan_results = central.scan(&[Filter::ServiceUuid(Uuid::from_u16(0x1844)).into()]);
+
+    let polled = scan_results.poll_next_unpin(&mut noop_cx);
+    assert_matches!(polled, Poll::Ready(Some(Ok(_))));
+}
diff --git a/rust/bt-gatt/src/tests/mod.rs b/rust/bt-gatt/src/tests/mod.rs
deleted file mode 100644
index 1df6bd5..0000000
--- a/rust/bt-gatt/src/tests/mod.rs
+++ /dev/null
@@ -1,175 +0,0 @@
-// Copyright 2023 Google LLC
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-use crate::{central::ScanResult, client::CharacteristicNotification, types::*};
-
-use bt_common::{PeerId, Uuid};
-use futures::{future::Ready, stream::Empty, Future, Stream};
-use std::task::Poll;
-
-#[derive(Default)]
-pub(crate) struct FakeCentral {}
-
-pub(crate) struct FakePeerService {}
-
-impl crate::client::PeerService for FakePeerService {
-    type CharacteristicsFut = Ready<Result<Vec<Characteristic>>>;
-    type NotificationStream = Empty<Result<CharacteristicNotification>>;
-    type ReadFut<'a> = Ready<Result<(usize, bool)>>;
-    type WriteFut<'a> = Ready<Result<()>>;
-
-    fn discover_characteristics(&self, _uuid: Option<Uuid>) -> Self::CharacteristicsFut {
-        todo!()
-    }
-
-    fn read_characteristic<'a>(
-        &self,
-        _handle: &Handle,
-        _offset: u16,
-        _buf: &'a mut [u8],
-    ) -> Self::ReadFut<'a> {
-        todo!()
-    }
-
-    fn write_characteristic<'a>(
-        &self,
-        _handle: &Handle,
-        _mode: WriteMode,
-        _offset: u16,
-        _buf: &'a [u8],
-    ) -> Self::WriteFut<'a> {
-        todo!()
-    }
-
-    fn read_descriptor<'a>(
-        &self,
-        _handle: &Handle,
-        _offset: u16,
-        _buf: &'a mut [u8],
-    ) -> Self::ReadFut<'a> {
-        todo!()
-    }
-
-    fn write_descriptor<'a>(
-        &self,
-        _handle: &Handle,
-        _offset: u16,
-        _buf: &'a [u8],
-    ) -> Self::WriteFut<'a> {
-        todo!()
-    }
-
-    fn subscribe(&self, _handle: &Handle) -> Self::NotificationStream {
-        todo!()
-    }
-}
-
-pub(crate) struct FakeServiceHandle {}
-
-impl crate::client::PeerServiceHandle for FakeServiceHandle {
-    type PeerServiceT = FakePeerService;
-    type ConnectFut = Ready<Result<Self::PeerServiceT>>;
-
-    fn uuid(&self) -> Uuid {
-        todo!()
-    }
-
-    fn is_primary(&self) -> bool {
-        todo!()
-    }
-
-    fn connect(&self) -> Self::ConnectFut {
-        todo!()
-    }
-}
-
-pub(crate) struct FakeClient {}
-
-impl crate::Client for FakeClient {
-    type PeerServiceHandleT = FakeServiceHandle;
-    type ServiceResultFut = Ready<Result<Vec<Self::PeerServiceHandleT>>>;
-
-    fn peer_id(&self) -> PeerId {
-        todo!()
-    }
-
-    fn find_service(&self, _uuid: Uuid) -> Self::ServiceResultFut {
-        todo!()
-    }
-}
-
-pub(crate) struct SingleResultStream {
-    result: Option<Result<crate::central::ScanResult>>,
-}
-
-impl Stream for SingleResultStream {
-    type Item = Result<crate::central::ScanResult>;
-
-    fn poll_next(
-        self: std::pin::Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        if self.result.is_some() {
-            Poll::Ready(self.get_mut().result.take())
-        } else {
-            // Never wake up, as if we never find another result
-            Poll::Pending
-        }
-    }
-}
-
-pub(crate) struct ClientConnectFut {}
-
-impl Future for ClientConnectFut {
-    type Output = Result<FakeClient>;
-
-    fn poll(
-        self: std::pin::Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
-    ) -> Poll<Self::Output> {
-        todo!()
-    }
-}
-
-impl crate::Central for FakeCentral {
-    type ScanResultStream = SingleResultStream;
-
-    type Client = FakeClient;
-
-    type ClientFut = ClientConnectFut;
-
-    fn scan(&self, _filters: &[crate::central::ScanFilter]) -> Self::ScanResultStream {
-        SingleResultStream {
-            result: Some(Ok(ScanResult {
-                id: PeerId(1),
-                connectable: true,
-                name: crate::central::PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
-                advertised: vec![crate::central::AdvertisingDatum::Services(vec![
-                    Uuid::from_u16(0x1844),
-                ])],
-            })),
-        }
-    }
-
-    fn connect(&self, _peer_id: PeerId) -> Self::ClientFut {
-        todo!()
-    }
-}
-
-#[test]
-fn central_search_works() {
-    use crate::Central;
-    use futures::StreamExt;
-
-    let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
-    use crate::central::Filter;
-    let central = FakeCentral::default();
-
-    let mut scan_results = central.scan(&[Filter::ServiceUuid(Uuid::from_u16(0x1844)).into()]);
-
-    let polled = scan_results.poll_next_unpin(&mut noop_cx);
-    let Poll::Ready(Some(Ok(_scan_result))) = polled else {
-        panic!("Expected a ready scan result got {polled:?}");
-    };
-}
diff --git a/rust/bt-gatt/src/types/mod.rs b/rust/bt-gatt/src/types/mod.rs
index b837f09..e6eda85 100644
--- a/rust/bt-gatt/src/types/mod.rs
+++ b/rust/bt-gatt/src/types/mod.rs
@@ -154,7 +154,7 @@
 /// guaranteed to be equal to a peer's attribute handles.
 /// Stack implementations should provide unique handles for each Characteristic and Descriptor
 /// within a PeerService.
-#[derive(Copy, Clone, PartialEq, Debug)]
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
 pub struct Handle(pub u64);
 
 pub enum WriteMode {
@@ -163,6 +163,7 @@
     WithoutResponse,
 }
 
+#[derive(Clone, Copy, Debug, PartialEq)]
 #[repr(u16)]
 pub enum CharacteristicProperty {
     Broadcast = 0x01,
@@ -176,6 +177,7 @@
     WritableAuxiliaries = 0x200,
 }
 
+#[derive(Clone, Debug)]
 pub struct CharacteristicProperties(pub Vec<CharacteristicProperty>);
 
 #[derive(Debug, Clone, Copy, Default)]
@@ -217,6 +219,7 @@
 /// This is a subset of the descriptors defined in the Core Specification (v5.4, Vol 3 Part G Sec
 /// 3.3.3).  Missing DescriptorTypes are handled internally and will be omitted from descriptor
 /// APIs.
+#[derive(Clone, Debug)]
 pub enum DescriptorType {
     UserDescription(String),
     ServerConfiguration {
@@ -234,6 +237,7 @@
     },
 }
 
+#[derive(Clone, Debug)]
 pub struct Descriptor {
     pub handle: Handle,
     /// Permissions required needed to interact with this descriptor.  May not be accurate on
@@ -244,6 +248,7 @@
 
 /// A Characteristic on a Service. Each Characteristic has a declaration, value, and zero or more
 /// decriptors.
+#[derive(Clone, Debug)]
 pub struct Characteristic {
     pub handle: Handle,
     pub uuid: Uuid,