rust/bt-broadcast-assistant: Initial implementation
Functionalities are:
- continuously scan and collate information about broadcast sources
- connect to peers and add/update/remove sources
Change-Id: I0628dd8035bfb6d3c569e16e438b9d221959762b
Reviewed-on: https://bluetooth-review.git.corp.google.com/c/bluetooth/+/1580
Reviewed-by: Marie Janssen <jamuraa@google.com>
Reviewed-by: Ani Ramakrishnan <aniramakri@google.com>
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 60e0918..f4da3a7 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -12,6 +12,7 @@
## Local path dependencies (keep sorted)
bt-bass = { path = "bt-bass" }
+bt-broadcast-assistant = { path = "bt-broadcast-assistant" }
bt-common = { path = "bt-common" }
bt-gatt = { path = "bt-gatt" }
bt-pacs = { path = "bt-pacs" }
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
index 3aa0b18..3479c66 100644
--- a/rust/bt-bass/src/client.rs
+++ b/rust/bt-bass/src/client.rs
@@ -13,7 +13,7 @@
use parking_lot::Mutex;
use tracing::warn;
-use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::core::{AddressType, AdvertisingSetId, PaInterval};
use bt_common::generic_audio::metadata_ltv::Metadata;
use bt_common::packet_encoding::Decodable;
use bt_gatt::client::{CharacteristicNotification, PeerService, ServiceCharacteristic};
@@ -123,7 +123,7 @@
audio_scan_control_point,
broadcast_sources: Default::default(),
broadcast_codes: HashMap::new(),
- notification_streams: None,
+ notification_streams: Some(SelectAll::new()),
}
}
@@ -407,6 +407,11 @@
}
brs
}
+
+ #[cfg(any(test, feature = "test-utils"))]
+ pub fn insert_broadcast_receive_state(&mut self, handle: Handle, brs: BroadcastReceiveState) {
+ self.broadcast_sources.lock().update_state(handle, brs);
+ }
}
#[cfg(test)]
diff --git a/rust/bt-bass/src/client/event.rs b/rust/bt-bass/src/client/event.rs
index 2c7ac2f..4077373 100644
--- a/rust/bt-bass/src/client/event.rs
+++ b/rust/bt-bass/src/client/event.rs
@@ -218,6 +218,7 @@
use assert_matches::assert_matches;
use futures::channel::mpsc::unbounded;
+ use bt_common::core::AddressType;
use bt_gatt::types::Handle;
#[test]
diff --git a/rust/bt-bass/src/types.rs b/rust/bt-bass/src/types.rs
index f810a3d..31c8b2d 100644
--- a/rust/bt-bass/src/types.rs
+++ b/rust/bt-bass/src/types.rs
@@ -3,7 +3,7 @@
// found in the LICENSE file.
use bt_common::core::ltv::LtValue;
-use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::core::{AddressType, AdvertisingSetId, PaInterval};
use bt_common::generic_audio::metadata_ltv::*;
use bt_common::packet_encoding::{Decodable, Encodable, Error as PacketError};
use bt_common::{decodable_enum, Uuid};
@@ -676,32 +676,6 @@
}
}
-/// See Broadcast Audio Scan Service spec v1.0 Table 3.5 for details.
-#[repr(u8)]
-#[derive(Clone, Copy, Debug, PartialEq)]
-pub enum AddressType {
- // Public Device Address or Public Identity Address.
- Public = 0x00,
- // Random Device Address or Random (static) Identity Address.
- Random = 0x01,
-}
-
-impl AddressType {
- const BYTE_SIZE: usize = 1;
-}
-
-impl TryFrom<u8> for AddressType {
- type Error = PacketError;
-
- fn try_from(value: u8) -> Result<Self, Self::Error> {
- match value {
- 0x00 => Ok(Self::Public),
- 0x01 => Ok(Self::Random),
- _ => Err(PacketError::OutOfRange),
- }
- }
-}
-
/// Broadcast Receive State characteristic as defined in
/// Broadcast Audio Scan Service spec v1.0 Section 3.2.
/// The Broadcast Receive State characteristic is used by the server to expose
@@ -791,7 +765,7 @@
+ EncryptionStatus::MIN_PACKET_SIZE
+ NUM_SUBGROUPS_BYTE_SIZE;
- #[cfg(test)]
+ #[cfg(any(test, feature = "test-utils"))]
pub fn new(
source_id: u8,
source_address_type: AddressType,
diff --git a/rust/bt-broadcast-assistant/.gitignore b/rust/bt-broadcast-assistant/.gitignore
new file mode 100644
index 0000000..75e03aa
--- /dev/null
+++ b/rust/bt-broadcast-assistant/.gitignore
@@ -0,0 +1,17 @@
+# Generated by Cargo
+# will have compiled files and executables
+debug/
+target/
+
+# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
+# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
+Cargo.lock
+
+# These are backup files generated by rustfmt
+**/*.rs.bk
+
+# MSVC Windows builds of rustc generate these, which store debugging information
+*.pdb
+
+# Vim swap files
+*.swp
diff --git a/rust/bt-broadcast-assistant/Cargo.toml b/rust/bt-broadcast-assistant/Cargo.toml
new file mode 100644
index 0000000..a3ccfa6
--- /dev/null
+++ b/rust/bt-broadcast-assistant/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "bt-broadcast-assistant"
+version = "0.0.1"
+edition.workspace = true
+license.workspace = true
+
+[dependencies]
+bt-bass.workspace = true
+bt-common.workspace = true
+bt-gatt.workspace = true
+futures.workspace = true
+parking_lot.workspace = true
+thiserror.workspace = true
+
+[dev-dependencies]
+assert_matches.workspace = true
+bt-bass = { workspace = true, features = ["test-utils"] }
diff --git a/rust/bt-broadcast-assistant/src/assistant.rs b/rust/bt-broadcast-assistant/src/assistant.rs
new file mode 100644
index 0000000..3d3e61e
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/assistant.rs
@@ -0,0 +1,272 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use parking_lot::Mutex;
+use std::collections::HashMap;
+use std::sync::Arc;
+use thiserror::Error;
+
+use bt_bass::client::error::Error as BassClientError;
+use bt_bass::client::BroadcastAudioScanServiceClient;
+use bt_bass::types::BroadcastId;
+use bt_common::{PeerId, Uuid};
+use bt_gatt::central::*;
+use bt_gatt::client::PeerServiceHandle;
+use bt_gatt::types::Error as GattError;
+use bt_gatt::Client;
+
+pub mod event;
+use event::*;
+pub mod peer;
+pub use peer::Peer;
+
+use crate::types::*;
+
+pub const BROADCAST_AUDIO_SCAN_SERVICE: Uuid = Uuid::from_u16(0x184F);
+pub const BASIC_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1851);
+pub const BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1852);
+
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error("GATT operation error: {0:?}")]
+ Gatt(#[from] GattError),
+
+ #[error("Broadcast Audio Scan Service client error at peer ({0}): {1:?}")]
+ BassClient(PeerId, BassClientError),
+
+ #[error("Not connected to Broadcast Audio Scan Service at peer ({0})")]
+ NotConnectedToBass(PeerId),
+
+ #[error("Central scanning terminated unexpectedly")]
+ CentralScanTerminated,
+
+ #[error("Failed to connect to service ({1}) at peer ({0})")]
+ ConnectionFailure(PeerId, Uuid),
+
+ #[error("Broadcast Assistant was already started previously. It cannot be started twice")]
+ AlreadyStarted,
+
+ #[error("Failed due to error: {0}")]
+ Generic(String),
+}
+
+/// Contains information about the currently-known broadcast
+/// sources and the peers they were found on
+#[derive(Debug)]
+pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<PeerId, BroadcastSource>>);
+
+impl DiscoveredBroadcastSources {
+ /// Creates a shareable instance of `DiscoveredBroadcastSources`.
+ pub fn new() -> Arc<Self> {
+ Arc::new(Self(Mutex::new(HashMap::new())))
+ }
+
+ /// Merges the broadcast source data with existing broadcast source data.
+ /// Returns the copy of the broadcast source data after the merge and
+ /// indicates whether it has changed from before or not.
+ pub(crate) fn merge_broadcast_source_data(
+ &self,
+ peer_id: &PeerId,
+ data: &BroadcastSource,
+ ) -> (BroadcastSource, bool) {
+ let mut lock = self.0.lock();
+ let source = lock.entry(*peer_id).or_default();
+ let before = source.clone();
+
+ source.merge(data);
+ let after = source.clone();
+ let changed = before != after;
+
+ (after, changed)
+ }
+
+ /// Get a BroadcastSource from a peer id.
+ fn get_by_peer_id(&self, peer_id: &PeerId) -> Option<BroadcastSource> {
+ let lock = self.0.lock();
+ lock.get(&peer_id).clone().map(|source| source.clone())
+ }
+
+ /// Get a BroadcastSource from associated broadcast id.
+ fn get_by_broadcast_id(&self, broadcast_id: &BroadcastId) -> Option<BroadcastSource> {
+ let lock = self.0.lock();
+ let info = lock.iter().find(|(&_k, &ref v)| v.broadcast_id == Some(*broadcast_id));
+ match info {
+ Some((&_peer_id, &ref broadcast_source)) => Some(broadcast_source.clone()),
+ None => None,
+ }
+ }
+}
+
+pub struct BroadcastAssistant<T: bt_gatt::GattTypes> {
+ central: T::Central,
+ broadcast_sources: Arc<DiscoveredBroadcastSources>,
+ scan_stream: Option<T::ScanResultStream>,
+}
+
+impl<T: bt_gatt::GattTypes + 'static> BroadcastAssistant<T> {
+ // Creates a broadcast assistant and sets it up to be ready
+ // for broadcast source scanning. Clients must use the `start`
+ // method to poll the event stream for scan results.
+ pub fn new(central: T::Central) -> Self
+ where
+ <T as bt_gatt::GattTypes>::ScanResultStream: std::marker::Send,
+ {
+ let scan_result_stream = central.scan(&Self::scan_filters());
+ Self {
+ central,
+ broadcast_sources: DiscoveredBroadcastSources::new(),
+ scan_stream: Some(scan_result_stream),
+ }
+ }
+
+ /// List of scan filters for advertisement data Broadcast Assistant should
+ /// look for, which are:
+ /// - Service data with Broadcast Audio Announcement Service UUID from
+ /// Broadcast Sources (see BAP spec v1.0.1 Section 3.7.2.1 for details)
+ // TODO(b/308481381): define filter for finding broadcast sink.
+ fn scan_filters() -> Vec<ScanFilter> {
+ vec![Filter::HasServiceData(BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE).into()]
+ }
+
+ /// Start broadcast assistant. Returns EventStream that the upper layer can
+ /// poll. Upper layer can call methods on BroadcastAssistant based on the
+ /// events it sees.
+ pub fn start(&mut self) -> Result<EventStream, Error>
+ where
+ <T as bt_gatt::GattTypes>::ScanResultStream: std::marker::Send,
+ {
+ if self.scan_stream.is_none() {
+ return Err(Error::AlreadyStarted);
+ }
+ Ok(EventStream::new(self.scan_stream.take().unwrap(), self.broadcast_sources.clone()))
+ }
+
+ pub fn scan_for_scan_delegators(&mut self) -> T::ScanResultStream {
+ // Scan for service data with Broadcast Audio Scan Service UUID to look
+ // for Broadcast Sink collocated with the Scan Delegator (see BAP spec v1.0.1
+ // Section 3.9.2 for details).
+ self.central.scan(&vec![Filter::HasServiceData(BROADCAST_AUDIO_SCAN_SERVICE).into()])
+ }
+
+ pub async fn connect_to_scan_delegator(&mut self, peer_id: PeerId) -> Result<Peer<T>, Error>
+ where
+ <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
+ {
+ let client = self.central.connect(peer_id).await?;
+ let service_handles = client.find_service(BROADCAST_AUDIO_SCAN_SERVICE).await?;
+
+ for handle in service_handles {
+ if handle.uuid() != BROADCAST_AUDIO_SCAN_SERVICE || !handle.is_primary() {
+ continue;
+ }
+ let service = handle.connect().await?;
+ let bass = BroadcastAudioScanServiceClient::<T>::create(service)
+ .await
+ .map_err(|e| Error::BassClient(peer_id, e))?;
+
+ let connected_peer =
+ Peer::<T>::new(peer_id, client, bass, self.broadcast_sources.clone());
+ return Ok(connected_peer);
+ }
+ Err(Error::ConnectionFailure(peer_id, BROADCAST_AUDIO_SCAN_SERVICE))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use futures::{pin_mut, FutureExt};
+ use std::task::Poll;
+
+ use bt_common::core::{AddressType, AdvertisingSetId};
+ use bt_gatt::test_utils::{FakeCentral, FakeClient, FakeTypes};
+
+ use crate::assistant::peer::tests::fake_bass_service;
+
+ #[test]
+ fn merge_broadcast_source() {
+ let discovered = DiscoveredBroadcastSources::new();
+
+ let (bs, changed) = discovered.merge_broadcast_source_data(
+ &PeerId(1001),
+ &BroadcastSource::default()
+ .with_address([1, 2, 3, 4, 5, 6])
+ .with_address_type(AddressType::Public)
+ .with_advertising_sid(AdvertisingSetId(1))
+ .with_broadcast_id(BroadcastId(1001)),
+ );
+ assert!(changed);
+ assert_eq!(
+ bs,
+ BroadcastSource {
+ address: Some([1, 2, 3, 4, 5, 6]),
+ address_type: Some(AddressType::Public),
+ advertising_sid: Some(AdvertisingSetId(1)),
+ broadcast_id: Some(BroadcastId(1001)),
+ pa_interval: None,
+ endpoint: None,
+ }
+ );
+
+ let (bs, changed) = discovered.merge_broadcast_source_data(
+ &PeerId(1001),
+ &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
+ BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
+ ),
+ );
+ assert!(changed);
+ assert_eq!(
+ bs,
+ BroadcastSource {
+ address: Some([1, 2, 3, 4, 5, 6]),
+ address_type: Some(AddressType::Random),
+ advertising_sid: Some(AdvertisingSetId(1)),
+ broadcast_id: Some(BroadcastId(1001)),
+ pa_interval: None,
+ endpoint: Some(BroadcastAudioSourceEndpoint {
+ presentation_delay_ms: 32,
+ big: vec![]
+ }),
+ }
+ );
+
+ let (_, changed) = discovered.merge_broadcast_source_data(
+ &PeerId(1001),
+ &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
+ BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
+ ),
+ );
+ assert!(!changed);
+ }
+
+ #[test]
+ fn start_stream() {
+ let mut assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new());
+ let _ = assistant.start().expect("can start stream");
+
+ // Stream can only be started once.
+ assert!(assistant.start().is_err());
+ }
+
+ #[test]
+ fn connect_to_scan_delegator() {
+ // Set up fake GATT related objects.
+ let mut central = FakeCentral::new();
+ let mut client = FakeClient::new();
+ central.add_client(PeerId(1004), client.clone());
+ let service = fake_bass_service();
+ client.add_service(BROADCAST_AUDIO_SCAN_SERVICE, true, service.clone());
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ let mut assistant = BroadcastAssistant::<FakeTypes>::new(central);
+ let conn_fut = assistant.connect_to_scan_delegator(PeerId(1004));
+ pin_mut!(conn_fut);
+ let polled = conn_fut.poll_unpin(&mut noop_cx);
+ let Poll::Ready(res) = polled else {
+ panic!("should be ready");
+ };
+ let _ = res.expect("should be ok");
+ }
+}
diff --git a/rust/bt-broadcast-assistant/src/assistant/event.rs b/rust/bt-broadcast-assistant/src/assistant/event.rs
new file mode 100644
index 0000000..993ba6f
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/assistant/event.rs
@@ -0,0 +1,259 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use futures::stream::{FusedStream, Stream, StreamExt};
+use std::sync::Arc;
+use std::task::Poll;
+
+use bt_bass::types::BroadcastId;
+use bt_common::packet_encoding::Decodable;
+use bt_common::packet_encoding::Error as PacketError;
+use bt_common::PeerId;
+use bt_gatt::central::{AdvertisingDatum, ScanResult};
+use core::pin::Pin;
+
+use crate::assistant::{
+ DiscoveredBroadcastSources, Error, BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
+ BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE,
+};
+use crate::types::{BroadcastAudioSourceEndpoint, BroadcastSource};
+
+#[derive(Debug)]
+pub enum Event {
+ FoundBroadcastSource { peer: PeerId, source: BroadcastSource },
+ CouldNotParseAdvertisingData { peer: PeerId, error: PacketError },
+}
+
+/// A stream of discovered broadcast sources.
+/// This stream polls the scan results from GATT client to discover
+/// available broadcast sources.
+pub struct EventStream {
+ // Central scan result stream for finding broadcast sources.
+ scan_result_stream: Pin<Box<dyn Stream<Item = Result<ScanResult, bt_gatt::types::Error>>>>,
+ terminated: bool,
+
+ broadcast_sources: Arc<DiscoveredBroadcastSources>,
+}
+
+impl EventStream {
+ pub(crate) fn new(
+ scan_result_stream: impl Stream<Item = Result<ScanResult, bt_gatt::types::Error>> + 'static,
+ broadcast_sources: Arc<DiscoveredBroadcastSources>,
+ ) -> Self {
+ Self {
+ scan_result_stream: Box::pin(scan_result_stream),
+ terminated: false,
+ broadcast_sources,
+ }
+ }
+
+ /// Returns the broadcast source if the scanned peer is a broadcast source.
+ /// Returns an error if parsing of the scan result data fails and None if
+ /// the scanned peer is not a broadcast source.
+ fn try_into_broadcast_source(
+ scan_result: &ScanResult,
+ ) -> Result<Option<BroadcastSource>, PacketError> {
+ let mut source = None;
+ for datum in &scan_result.advertised {
+ let AdvertisingDatum::ServiceData(uuid, data) = datum else {
+ continue;
+ };
+ if *uuid == BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE {
+ let (bid, _) = BroadcastId::decode(data.as_slice())?;
+ source.get_or_insert(BroadcastSource::default()).with_broadcast_id(bid);
+ } else if *uuid == BASIC_AUDIO_ANNOUNCEMENT_SERVICE {
+ // TODO(dayeonglee): revisit when we implement periodic advertisement.
+ let (base, _) = BroadcastAudioSourceEndpoint::decode(data.as_slice())?;
+ source.get_or_insert(BroadcastSource::default()).with_endpoint(base);
+ }
+ }
+ Ok(source)
+ }
+}
+
+impl FusedStream for EventStream {
+ fn is_terminated(&self) -> bool {
+ self.terminated
+ }
+}
+
+impl Stream for EventStream {
+ type Item = Result<Event, Error>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ if self.terminated {
+ return Poll::Ready(None);
+ }
+
+ // Poll scan result stream to check if there were any newly discovered peers
+ // that we're interested in.
+ match futures::ready!(self.scan_result_stream.poll_next_unpin(cx)) {
+ Some(Ok(scanned)) => {
+ match Self::try_into_broadcast_source(&scanned) {
+ Err(e) => {
+ return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData {
+ peer: scanned.id,
+ error: e,
+ })));
+ }
+ Ok(Some(found_source)) => {
+ // If we found a broadcast source, we add its information in the
+ // internal records.
+ let (broadcast_source, changed) = self
+ .broadcast_sources
+ .merge_broadcast_source_data(&scanned.id, &found_source);
+
+ // Broadcast found event is relayed to the client iff complete
+ // information has been gathered.
+ if broadcast_source.into_add_source() && changed {
+ return Poll::Ready(Some(Ok(Event::FoundBroadcastSource {
+ peer: scanned.id,
+ source: broadcast_source,
+ })));
+ }
+
+ Poll::Pending
+ }
+ Ok(None) => Poll::Pending,
+ }
+ }
+ None | Some(Err(_)) => {
+ self.terminated = true;
+ Poll::Ready(Some(Err(Error::CentralScanTerminated)))
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use assert_matches::assert_matches;
+
+ use bt_common::core::{AddressType, AdvertisingSetId};
+ use bt_gatt::central::{AdvertisingDatum, PeerName};
+ use bt_gatt::test_utils::ScannedResultStream;
+ use bt_gatt::types::Error as BtGattError;
+ use bt_gatt::types::GattError;
+
+ fn setup_stream() -> (EventStream, ScannedResultStream) {
+ let fake_scan_result_stream = ScannedResultStream::new();
+ let broadcast_sources = DiscoveredBroadcastSources::new();
+
+ (
+ EventStream::new(fake_scan_result_stream.clone(), broadcast_sources),
+ fake_scan_result_stream,
+ )
+ }
+
+ #[test]
+ fn poll_found_broadcast_source_events() {
+ let (mut stream, mut scan_result_stream) = setup_stream();
+
+ // Scanned a broadcast source and its broadcast id.
+ let broadcast_source_pid = PeerId(1005);
+
+ scan_result_stream.set_scanned_result(Ok(ScanResult {
+ id: broadcast_source_pid,
+ connectable: true,
+ name: PeerName::Unknown,
+ advertised: vec![AdvertisingDatum::ServiceData(
+ BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE,
+ vec![0x01, 0x02, 0x03],
+ )],
+ }));
+
+ // Found broadcast source event shouldn't have been sent since braodcast source
+ // information isn't complete.
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+ // Pretend somehow address, address type, and advertising sid were
+ // filled out. This completes the broadcast source information.
+ // TODO(b/308481381): replace this block with sending a central scan result that
+ // contains the data.
+ let _ = stream.broadcast_sources.merge_broadcast_source_data(
+ &broadcast_source_pid,
+ &BroadcastSource::default()
+ .with_address([1, 2, 3, 4, 5, 6])
+ .with_address_type(AddressType::Public)
+ .with_advertising_sid(AdvertisingSetId(1)),
+ );
+
+ // Scanned broadcast source's BASE data.
+ // TODO(b/308481381): replace this block sending data through PA train instead.
+ #[rustfmt::skip]
+ let base_data = vec![
+ 0x10, 0x20, 0x30, 0x02, // presentation delay, num of subgroups
+ 0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #1)
+ 0x00, // codec specific config len
+ 0x00, // metadata len,
+ 0x01, 0x00, // bis index, codec specific config len (big #1 / bis #1)
+ 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #2)
+ 0x00, // codec specific config len
+ 0x00, // metadata len,
+ 0x01, 0x03, 0x02, 0x05,
+ 0x08, /* bis index, codec specific config len, codec frame blocks LTV
+ * (big #2 / bis #2) */
+ ];
+
+ scan_result_stream.set_scanned_result(Ok(ScanResult {
+ id: broadcast_source_pid,
+ connectable: true,
+ name: PeerName::Unknown,
+ advertised: vec![AdvertisingDatum::ServiceData(
+ BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
+ base_data.clone(),
+ )],
+ }));
+
+ // Expect the stream to send out broadcast source found event since information
+ // is complete.
+ let Poll::Ready(Some(Ok(event))) = stream.poll_next_unpin(&mut noop_cx) else {
+ panic!("should have received event");
+ };
+ assert_matches!(event, Event::FoundBroadcastSource{peer, ..} => {
+ assert_eq!(peer, broadcast_source_pid)
+ });
+
+ assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+
+ // Scanned the same broadcast source's BASE data.
+ scan_result_stream.set_scanned_result(Ok(ScanResult {
+ id: broadcast_source_pid,
+ connectable: true,
+ name: PeerName::Unknown,
+ advertised: vec![AdvertisingDatum::ServiceData(
+ BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
+ base_data.clone(),
+ )],
+ }));
+
+ // Shouldn't have gotten the event again since the information remained the
+ // same.
+ assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+ }
+
+ #[test]
+ fn central_scan_stream_terminates() {
+ let (mut stream, mut scan_result_stream) = setup_stream();
+
+ // Mimick scan error.
+ scan_result_stream.set_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu)));
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+ match stream.poll_next_unpin(&mut noop_cx) {
+ Poll::Ready(Some(Err(e))) => assert_matches!(e, Error::CentralScanTerminated),
+ _ => panic!("should have received central scan terminated error"),
+ }
+
+ // Entire stream should have terminated.
+ assert_matches!(stream.poll_next_unpin(&mut noop_cx), Poll::Ready(None));
+ assert_matches!(stream.is_terminated(), true);
+ }
+}
diff --git a/rust/bt-broadcast-assistant/src/assistant/peer.rs b/rust/bt-broadcast-assistant/src/assistant/peer.rs
new file mode 100644
index 0000000..3369a4d
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/assistant/peer.rs
@@ -0,0 +1,294 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use futures::stream::FusedStream;
+use futures::Stream;
+use std::sync::Arc;
+use thiserror::Error;
+
+use bt_bass::client::error::Error as BassClientError;
+use bt_bass::client::event::Event as BassEvent;
+use bt_bass::client::{BigToBisSync, BroadcastAudioScanServiceClient};
+use bt_bass::types::{BroadcastId, PaSync};
+use bt_common::core::PaInterval;
+use bt_common::packet_encoding::Error as PacketError;
+use bt_common::PeerId;
+
+use crate::assistant::DiscoveredBroadcastSources;
+
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error("Failed to take event stream at Broadcast Audio Scan Service client")]
+ UnavailableBassEventStream,
+
+ #[error("Broadcast Audio Scan Service client error: {0:?}")]
+ BassClient(#[from] BassClientError),
+
+ #[error("Incomplete information for broadcast source with peer id ({0})")]
+ NotEnoughInfo(PeerId),
+
+ #[error("Broadcast source with peer id ({0}) does not exist")]
+ DoesNotExist(PeerId),
+
+ #[error("Packet error: {0}")]
+ PacketError(#[from] PacketError),
+}
+
+/// Connected scan delegator peer. Clients can use this
+/// object to perform Broadcast Audio Scan Service operations on the
+/// scan delegator peer.
+/// Not thread-safe and only one operation must be done at a time.
+pub struct Peer<T: bt_gatt::GattTypes> {
+ peer_id: PeerId,
+ // Keep for peer connection.
+ _client: T::Client,
+ bass: BroadcastAudioScanServiceClient<T>,
+ // TODO(b/309015071): add a field for pacs.
+ broadcast_sources: Arc<DiscoveredBroadcastSources>,
+}
+
+impl<T: bt_gatt::GattTypes> Peer<T> {
+ pub(crate) fn new(
+ peer_id: PeerId,
+ client: T::Client,
+ bass: BroadcastAudioScanServiceClient<T>,
+ broadcast_sources: Arc<DiscoveredBroadcastSources>,
+ ) -> Self {
+ Peer { peer_id, _client: client, bass, broadcast_sources }
+ }
+
+ pub fn peer_id(&self) -> PeerId {
+ self.peer_id
+ }
+
+ /// Takes event stream for BASS events from this scan delegator peer.
+ /// Clients can call this method to start subscribing to BASS events.
+ pub fn take_event_stream(
+ &mut self,
+ ) -> Result<impl Stream<Item = Result<BassEvent, BassClientError>> + FusedStream, Error> {
+ self.bass.take_event_stream().ok_or(Error::UnavailableBassEventStream)
+ }
+
+ /// Send broadcast code for a particular broadcast.
+ pub async fn send_broadcast_code(
+ &mut self,
+ broadcast_id: BroadcastId,
+ broadcast_code: [u8; 16],
+ ) -> Result<(), Error> {
+ self.bass.set_broadcast_code(broadcast_id, broadcast_code).await.map_err(Into::into)
+ }
+
+ /// Sends a command to add a particular broadcast source.
+ ///
+ /// # Arguments
+ ///
+ /// * `broadcast_source_pid` - peer id of the braodcast source that's to be
+ /// added to this scan delegator peer
+ /// * `pa_sync` - pa sync mode the peer should attempt to be in
+ /// * `bis_sync` - desired BIG to BIS synchronization information. If the
+ /// set is empty, no preference value is used for all the BIGs
+ pub async fn add_broadcast_source(
+ &mut self,
+ source_peer_id: PeerId,
+ pa_sync: PaSync,
+ bis_sync: BigToBisSync,
+ ) -> Result<(), Error> {
+ let broadcast_source = self
+ .broadcast_sources
+ .get_by_peer_id(&source_peer_id)
+ .ok_or(Error::DoesNotExist(source_peer_id))?;
+ if !broadcast_source.into_add_source() {
+ return Err(Error::NotEnoughInfo(source_peer_id));
+ }
+
+ self.bass
+ .add_broadcast_source(
+ broadcast_source.broadcast_id.unwrap(),
+ broadcast_source.address_type.unwrap(),
+ broadcast_source.address.unwrap(),
+ broadcast_source.advertising_sid.unwrap(),
+ pa_sync,
+ broadcast_source.pa_interval.unwrap_or(PaInterval::unknown()),
+ broadcast_source
+ .endpoint
+ .unwrap()
+ .get_bass_subgroups(bis_sync)
+ .map_err(Error::PacketError)?,
+ )
+ .await
+ .map_err(Into::into)
+ }
+
+ /// Sends a command to to update a particular broadcast source's PA sync.
+ ///
+ /// # Arguments
+ ///
+ /// * `broadcast_id` - broadcast id of the broadcast source that's to be
+ /// updated
+ /// * `pa_sync` - pa sync mode the scan delegator peer should attempt to be
+ /// in.
+ /// * `bis_sync` - desired BIG to BIS synchronization information
+ pub async fn update_broadcast_source_sync(
+ &mut self,
+ broadcast_id: BroadcastId,
+ pa_sync: PaSync,
+ bis_sync: BigToBisSync,
+ ) -> Result<(), Error> {
+ let pa_interval = self
+ .broadcast_sources
+ .get_by_broadcast_id(&broadcast_id)
+ .map(|bs| bs.pa_interval)
+ .unwrap_or(None);
+
+ self.bass
+ .modify_broadcast_source(broadcast_id, pa_sync, pa_interval, Some(bis_sync), None)
+ .await
+ .map_err(Into::into)
+ }
+
+ /// Sends a command to remove a particular broadcast source.
+ ///
+ /// # Arguments
+ ///
+ /// * `broadcast_id` - broadcast id of the braodcast source that's to be
+ /// removed from the scan delegator
+ pub async fn remove_broadcast_source(
+ &mut self,
+ broadcast_id: BroadcastId,
+ ) -> Result<(), Error> {
+ self.bass.remove_broadcast_source(broadcast_id).await.map_err(Into::into)
+ }
+
+ /// Sends a command to inform the scan delegator peer that we have
+ /// started scanning for broadcast sources on behalf of it.
+ pub async fn inform_remote_scan_started(&mut self) -> Result<(), Error> {
+ self.bass.remote_scan_started().await.map_err(Into::into)
+ }
+
+ /// Sends a command to inform the scan delegator peer that we have
+ /// stopped scanning for broadcast sources on behalf of it.
+ pub async fn inform_remote_scan_stopped(&mut self) -> Result<(), Error> {
+ self.bass.remote_scan_stopped().await.map_err(Into::into)
+ }
+}
+
+#[cfg(test)]
+pub(crate) mod tests {
+ use super::*;
+
+ use assert_matches::assert_matches;
+ use futures::{pin_mut, FutureExt};
+ use std::collections::HashSet;
+ use std::task::Poll;
+
+ use bt_common::core::{AddressType, AdvertisingSetId};
+ use bt_gatt::test_utils::{FakeClient, FakePeerService, FakeTypes};
+ use bt_gatt::types::{
+ AttributePermissions, CharacteristicProperties, CharacteristicProperty, Handle,
+ };
+ use bt_gatt::Characteristic;
+
+ use crate::types::BroadcastSource;
+
+ const RECEIVE_STATE_HANDLE: Handle = Handle(0x11);
+ const AUDIO_SCAN_CONTROL_POINT_HANDLE: Handle = Handle(0x12);
+
+ pub(crate) fn fake_bass_service() -> FakePeerService {
+ let mut peer_service = FakePeerService::new();
+ // One broadcast receive state and one broadcast audio scan control
+ // point characteristic handles.
+ peer_service.add_characteristic(
+ Characteristic {
+ handle: RECEIVE_STATE_HANDLE,
+ uuid: bt_bass::types::BROADCAST_RECEIVE_STATE_UUID,
+ properties: CharacteristicProperties(vec![
+ CharacteristicProperty::Broadcast,
+ CharacteristicProperty::Notify,
+ ]),
+ permissions: AttributePermissions::default(),
+ descriptors: vec![],
+ },
+ vec![],
+ );
+ peer_service.add_characteristic(
+ Characteristic {
+ handle: AUDIO_SCAN_CONTROL_POINT_HANDLE,
+ uuid: bt_bass::types::BROADCAST_AUDIO_SCAN_CONTROL_POINT_UUID,
+ properties: CharacteristicProperties(vec![CharacteristicProperty::Broadcast]),
+ permissions: AttributePermissions::default(),
+ descriptors: vec![],
+ },
+ vec![],
+ );
+ peer_service
+ }
+
+ fn setup() -> (Peer<FakeTypes>, FakePeerService, Arc<DiscoveredBroadcastSources>) {
+ let peer_service = fake_bass_service();
+
+ let broadcast_sources = DiscoveredBroadcastSources::new();
+ (
+ Peer {
+ peer_id: PeerId(0x1),
+ _client: FakeClient::new(),
+ bass: BroadcastAudioScanServiceClient::<FakeTypes>::create_for_test(
+ peer_service.clone(),
+ Handle(0x1),
+ ),
+ broadcast_sources: broadcast_sources.clone(),
+ },
+ peer_service,
+ broadcast_sources,
+ )
+ }
+
+ #[test]
+ fn take_event_stream() {
+ let (mut peer, _peer_service, _broadcast_source) = setup();
+ let _event_stream = peer.take_event_stream().expect("should succeed");
+
+ // If we try to take the event stream the second time, it should fail.
+ assert!(peer.take_event_stream().is_err());
+ }
+
+ #[test]
+ fn add_broadcast_source_fail() {
+ let (mut peer, _peer_service, broadcast_source) = setup();
+
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+ // Should fail because broadcast source doesn't exist.
+ {
+ let fut = peer.add_broadcast_source(
+ PeerId(1001),
+ PaSync::SyncPastUnavailable,
+ HashSet::new(),
+ );
+ pin_mut!(fut);
+ let polled = fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Err(_)));
+ }
+
+ let _ = broadcast_source.merge_broadcast_source_data(
+ &PeerId(1001),
+ &BroadcastSource::default()
+ .with_address([1, 2, 3, 4, 5, 6])
+ .with_address_type(AddressType::Public)
+ .with_advertising_sid(AdvertisingSetId(1))
+ .with_broadcast_id(BroadcastId(1001)),
+ );
+
+ // Should fail because not enough information.
+ {
+ let fut = peer.add_broadcast_source(
+ PeerId(1001),
+ PaSync::SyncPastUnavailable,
+ HashSet::new(),
+ );
+ pin_mut!(fut);
+ let polled = fut.poll_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Err(_)));
+ }
+ }
+}
diff --git a/rust/bt-broadcast-assistant/src/lib.rs b/rust/bt-broadcast-assistant/src/lib.rs
new file mode 100644
index 0000000..7d7af50
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/lib.rs
@@ -0,0 +1,7 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+pub mod assistant;
+pub use assistant::BroadcastAssistant;
+pub mod types;
diff --git a/rust/bt-broadcast-assistant/src/types.rs b/rust/bt-broadcast-assistant/src/types.rs
new file mode 100644
index 0000000..8c6074e
--- /dev/null
+++ b/rust/bt-broadcast-assistant/src/types.rs
@@ -0,0 +1,435 @@
+// Copyright 2023 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use bt_bass::client::BigToBisSync;
+use bt_bass::types::{BigSubgroup, BisSync, BroadcastId};
+use bt_common::core::ltv::LtValue;
+use bt_common::core::{Address, AddressType, CodecId};
+use bt_common::core::{AdvertisingSetId, PaInterval};
+use bt_common::generic_audio::codec_configuration::CodecConfiguration;
+use bt_common::generic_audio::metadata_ltv::Metadata;
+use bt_common::packet_encoding::{Decodable, Error as PacketError};
+
+/// Broadcast source data as advertised through Basic Audio Announcement
+/// PA and Broadcast Audio Announcement.
+/// See BAP spec v1.0.1 Section 3.7.2.1 and Section 3.7.2.2 for details.
+// TODO(b/308481381): fill out endpoint from basic audio announcement from PA trains.
+#[derive(Clone, Default, Debug, PartialEq)]
+pub struct BroadcastSource {
+ pub(crate) address: Option<Address>,
+ pub(crate) address_type: Option<AddressType>,
+ pub(crate) advertising_sid: Option<AdvertisingSetId>,
+ pub(crate) broadcast_id: Option<BroadcastId>,
+ pub(crate) pa_interval: Option<PaInterval>,
+ pub(crate) endpoint: Option<BroadcastAudioSourceEndpoint>,
+}
+
+impl BroadcastSource {
+ /// Returns whether or not this BroadcastSource has enough information
+ /// to be added by the Broadcast Assistant.
+ pub(crate) fn into_add_source(&self) -> bool {
+ // PA interval is not necessary since default value can be used.
+ self.address.is_some()
+ && self.address_type.is_some()
+ && self.advertising_sid.is_some()
+ && self.broadcast_id.is_some()
+ && self.endpoint.is_some()
+ }
+
+ pub fn with_address(&mut self, address: [u8; 6]) -> &mut Self {
+ self.address = Some(address);
+ self
+ }
+
+ pub fn with_address_type(&mut self, type_: AddressType) -> &mut Self {
+ self.address_type = Some(type_);
+ self
+ }
+
+ pub fn with_broadcast_id(&mut self, bid: BroadcastId) -> &mut Self {
+ self.broadcast_id = Some(bid);
+ self
+ }
+
+ pub fn with_advertising_sid(&mut self, sid: AdvertisingSetId) -> &mut Self {
+ self.advertising_sid = Some(sid);
+ self
+ }
+
+ pub fn with_endpoint(&mut self, endpoint: BroadcastAudioSourceEndpoint) -> &mut Self {
+ self.endpoint = Some(endpoint);
+ self
+ }
+
+ /// Merge fields from other broadcast source into this broadcast source.
+ /// Set fields in other source take priority over this source.
+ /// If a field in the other broadcast source is none, it's ignored and
+ /// the existing values are kept.
+ pub(crate) fn merge(&mut self, other: &BroadcastSource) {
+ if let Some(address) = other.address {
+ self.address = Some(address);
+ }
+ if let Some(address_type) = other.address_type {
+ self.address_type = Some(address_type);
+ }
+ if let Some(advertising_sid) = other.advertising_sid {
+ self.advertising_sid = Some(advertising_sid);
+ }
+ if let Some(broadcast_id) = other.broadcast_id {
+ self.broadcast_id = Some(broadcast_id);
+ }
+ if let Some(pa_interval) = other.pa_interval {
+ self.pa_interval = Some(pa_interval);
+ }
+ if let Some(endpoint) = &other.endpoint {
+ self.endpoint = Some(endpoint.clone());
+ }
+ }
+}
+
+/// Parameters exposed as part of Basic Audio Announcement from Broadcast
+/// Sources. See BAP spec v1.0.1 Section 3.7.2.2 for more details.
+// TODO(b/308481381): Fill out the struct.
+#[derive(Clone, Debug, PartialEq)]
+pub struct BroadcastAudioSourceEndpoint {
+ // Delay is 3 bytes.
+ pub(crate) presentation_delay_ms: u32,
+ pub(crate) big: Vec<BroadcastIsochronousGroup>,
+}
+
+impl BroadcastAudioSourceEndpoint {
+ // Should contain presentation delay, num BIG, and at least one BIG praram.
+ const MIN_PACKET_SIZE: usize = 3 + 1 + BroadcastIsochronousGroup::MIN_PACKET_SIZE;
+
+ /// Returns the representation of this object's broadcast isochronous groups
+ /// that's usable with Broadcast Audio Scan Service operations.
+ ///
+ /// # Arguments
+ ///
+ /// * `bis_sync` - BIG to BIS sync information. If the set is empty, no
+ /// preference value is used for all the BIGs
+ pub(crate) fn get_bass_subgroups(
+ &self,
+ bis_sync: BigToBisSync,
+ ) -> Result<Vec<BigSubgroup>, PacketError> {
+ let mut subgroups = Vec::new();
+ let sync_map = bt_bass::client::big_to_bis_sync_indices(&bis_sync);
+
+ for (big_index, group) in self.big.iter().enumerate() {
+ let bis_sync = match sync_map.get(&(big_index as u8)) {
+ Some(bis_indices) => {
+ let mut bis_sync: BisSync = BisSync(0);
+ bis_sync.set_sync(bis_indices)?;
+ bis_sync
+ }
+ _ => BisSync::default(),
+ };
+ subgroups.push(BigSubgroup::new(Some(bis_sync)).with_metadata(group.metadata.clone()));
+ }
+ Ok(subgroups)
+ }
+}
+
+impl Decodable for BroadcastAudioSourceEndpoint {
+ type Error = PacketError;
+
+ fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+ if buf.len() < Self::MIN_PACKET_SIZE {
+ return Err(PacketError::UnexpectedDataLength);
+ }
+
+ let mut idx = 0 as usize;
+ let presentation_delay = u32::from_le_bytes([buf[idx], buf[idx + 1], buf[idx + 2], 0x00]);
+ idx += 3;
+
+ let num_big: usize = buf[idx] as usize;
+ idx += 1;
+ if num_big < 1 {
+ return Err(PacketError::InvalidParameter(format!(
+ "num of subgroups shall be at least 1 got {num_big}"
+ )));
+ }
+
+ let mut big = Vec::new();
+ while big.len() < num_big {
+ let (group, len) = BroadcastIsochronousGroup::decode(&buf[idx..])
+ .map_err(|e| PacketError::InvalidParameter(format!("{e}")))?;
+ big.push(group);
+ idx += len;
+ }
+
+ Ok((Self { presentation_delay_ms: presentation_delay, big }, idx))
+ }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct BroadcastIsochronousGroup {
+ pub(crate) codec_id: CodecId,
+ pub(crate) codec_specific_config: Vec<CodecConfiguration>,
+ pub(crate) metadata: Vec<Metadata>,
+ pub(crate) bis: Vec<BroadcastIsochronousStream>,
+}
+
+impl BroadcastIsochronousGroup {
+ // Should contain num BIS, codec id, codec specific config len, metadata len,
+ // and at least one BIS praram.
+ const MIN_PACKET_SIZE: usize =
+ 1 + CodecId::BYTE_SIZE + 1 + 1 + BroadcastIsochronousStream::MIN_PACKET_SIZE;
+}
+
+impl Decodable for BroadcastIsochronousGroup {
+ type Error = PacketError;
+
+ fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+ if buf.len() < BroadcastIsochronousGroup::MIN_PACKET_SIZE {
+ return Err(PacketError::UnexpectedDataLength);
+ }
+
+ let mut idx = 0;
+ let num_bis = buf[idx] as usize;
+ idx += 1;
+ if num_bis < 1 {
+ return Err(PacketError::InvalidParameter(format!(
+ "num of BIS shall be at least 1 got {num_bis}"
+ )));
+ }
+
+ let (codec_id, read_bytes) = CodecId::decode(&buf[idx..])?;
+ idx += read_bytes;
+
+ let codec_config_len = buf[idx] as usize;
+ idx += 1;
+
+ let (results, consumed) = CodecConfiguration::decode_all(&buf[idx..idx + codec_config_len]);
+ if consumed != codec_config_len {
+ return Err(bt_common::packet_encoding::Error::UnexpectedDataLength);
+ }
+
+ let codec_specific_configs = results.into_iter().filter_map(Result::ok).collect();
+ idx += codec_config_len;
+
+ let metadata_len = buf[idx] as usize;
+ idx += 1;
+
+ let (results_metadata, consumed_len) = Metadata::decode_all(&buf[idx..idx + metadata_len]);
+ if consumed_len != metadata_len {
+ return Err(PacketError::UnexpectedDataLength);
+ }
+ // Ignore any undecodable metadata types
+ let metadata = results_metadata.into_iter().filter_map(Result::ok).collect();
+ idx += consumed_len;
+
+ let mut bis = Vec::new();
+ while bis.len() < num_bis {
+ let (stream, len) = BroadcastIsochronousStream::decode(&buf[idx..])
+ .map_err(|e| PacketError::InvalidParameter(e.to_string()))?;
+ bis.push(stream);
+ idx += len;
+ }
+
+ Ok((
+ BroadcastIsochronousGroup {
+ codec_id,
+ codec_specific_config: codec_specific_configs,
+ metadata,
+ bis,
+ },
+ idx,
+ ))
+ }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct BroadcastIsochronousStream {
+ pub(crate) bis_index: u8,
+ pub(crate) codec_specific_config: Vec<CodecConfiguration>,
+}
+
+impl BroadcastIsochronousStream {
+ const MIN_PACKET_SIZE: usize = 1 + 1;
+}
+
+impl Decodable for BroadcastIsochronousStream {
+ type Error = PacketError;
+
+ fn decode(buf: &[u8]) -> core::result::Result<(Self, usize), Self::Error> {
+ if buf.len() < BroadcastIsochronousStream::MIN_PACKET_SIZE {
+ return Err(PacketError::UnexpectedDataLength);
+ }
+
+ let mut idx = 0;
+
+ let bis_index = buf[idx];
+ idx += 1;
+
+ let codec_config_len = buf[idx] as usize;
+ idx += 1;
+
+ let (results, consumed) = CodecConfiguration::decode_all(&buf[idx..idx + codec_config_len]);
+ if consumed != codec_config_len {
+ return Err(bt_common::packet_encoding::Error::UnexpectedDataLength);
+ }
+ let codec_specific_configs = results.into_iter().filter_map(Result::ok).collect();
+ idx += codec_config_len;
+
+ Ok((
+ BroadcastIsochronousStream { bis_index, codec_specific_config: codec_specific_configs },
+ idx,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashSet;
+
+ use super::*;
+
+ use bt_common::generic_audio::codec_configuration::{FrameDuration, SamplingFrequency};
+ use bt_common::generic_audio::AudioLocation;
+
+ #[test]
+ fn decode_bis() {
+ #[rustfmt::skip]
+ let buf = [
+ 0x01, 0x09, // bis index and codec specific config len
+ 0x02, 0x01, 0x06, // sampling frequency LTV
+ 0x05, 0x03, 0x03, 0x00, 0x00, 0x0C, // audio location LTV
+ ];
+
+ let (bis, _read_bytes) =
+ BroadcastIsochronousStream::decode(&buf[..]).expect("should not fail");
+ assert_eq!(
+ bis,
+ BroadcastIsochronousStream {
+ bis_index: 0x01,
+ codec_specific_config: vec![
+ CodecConfiguration::SamplingFrequency(SamplingFrequency::F32000Hz),
+ CodecConfiguration::AudioChannelAllocation(HashSet::from([
+ AudioLocation::FrontLeft,
+ AudioLocation::FrontRight,
+ AudioLocation::LeftSurround,
+ AudioLocation::RightSurround
+ ])),
+ ],
+ }
+ );
+ }
+
+ #[test]
+ fn decode_big() {
+ #[rustfmt::skip]
+ let buf = [
+ 0x02, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id
+ 0x04, 0x03, 0x04, 0x04, 0x10, // codec specific config len, octets per codec frame LTV
+ 0x03, 0x02, 0x08, 0x01, // metadata len, audio active state LTV
+ 0x01, 0x00, // bis index, codec specific config len (bis #1)
+ 0x02, 0x03, 0x02, 0x02, 0x01, // bis index, codec specific config len, frame duration LTV (bis #2)
+ ];
+
+ let (big, _read_bytes) =
+ BroadcastIsochronousGroup::decode(&buf[..]).expect("should not fail");
+ assert_eq!(
+ big,
+ BroadcastIsochronousGroup {
+ codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Transparent),
+ codec_specific_config: vec![CodecConfiguration::OctetsPerCodecFrame(0x1004),],
+ metadata: vec![Metadata::AudioActiveState(true)],
+ bis: vec![
+ BroadcastIsochronousStream { bis_index: 0x01, codec_specific_config: vec![] },
+ BroadcastIsochronousStream {
+ bis_index: 0x02,
+ codec_specific_config: vec![CodecConfiguration::FrameDuration(
+ FrameDuration::TenMs
+ )],
+ },
+ ],
+ }
+ );
+ }
+
+ #[test]
+ fn decode_base() {
+ #[rustfmt::skip]
+ let buf = [
+ 0x10, 0x20, 0x30, 0x02, // presentation delay, num of subgroups
+ 0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #1)
+ 0x00, // codec specific config len
+ 0x00, // metadata len,
+ 0x01, 0x00, // bis index, codec specific config len (big #1 / bis #1)
+ 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id (big #2)
+ 0x00, // codec specific config len
+ 0x00, // metadata len,
+ 0x01, 0x03, 0x02, 0x05, 0x08, // bis index, codec specific config len, codec frame blocks LTV (big #2 / bis #2)
+ ];
+
+ let (base, _read_bytes) =
+ BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail");
+ assert_eq!(base.presentation_delay_ms, 0x00302010);
+ assert_eq!(base.big.len(), 2);
+ assert_eq!(
+ base.big[0],
+ BroadcastIsochronousGroup {
+ codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Transparent),
+ codec_specific_config: vec![],
+ metadata: vec![],
+ bis: vec![BroadcastIsochronousStream {
+ bis_index: 0x01,
+ codec_specific_config: vec![],
+ },],
+ }
+ );
+ assert_eq!(
+ base.big[1],
+ BroadcastIsochronousGroup {
+ codec_id: CodecId::Assigned(bt_common::core::CodingFormat::Cvsd),
+ codec_specific_config: vec![],
+ metadata: vec![],
+ bis: vec![BroadcastIsochronousStream {
+ bis_index: 0x01,
+ codec_specific_config: vec![CodecConfiguration::CodecFramesPerSdu(0x08)],
+ },],
+ }
+ );
+ }
+
+ #[test]
+ fn decode_base_complex() {
+ // BroadcastAudioSourceEndpoint { presentation_delay_ms: 20000, big:
+ // [BroadcastIsochronousGroup { codec_id: Assigned(Lc3), codec_specific_config:
+ // [SamplingFrequency(F24000Hz), FrameDuration(TenMs),
+ // AudioChannelAllocation({FrontLeft, FrontRight}), OctetsPerCodecFrame(60)],
+ // metadata: [StreamingAudioContexts([Media])], bis:
+ // [BroadcastIsochronousStream { bis_index: 1, codec_specific_config:
+ // [AudioChannelAllocation({FrontLeft})] }, BroadcastIsochronousStream {
+ // bis_index: 2, codec_specific_con$
+ // ig: [AudioChannelAllocation({FrontRight})] }] }] }
+ #[rustfmt::skip]
+ let buf = [
+ 0x20, 0x4e, 0x00, 0x01, 0x02, 0x06, 0x00, 0x00, 0x00, 0x00, 0x10, 0x02, 0x01, 0x05, 0x02, 0x02, 0x01, 0x05,
+ 0x03, 0x03, 0x00, 0x00, 0x00, 0x03, 0x04, 0x3c, 0x00, 0x04, 0x03, 0x02, 0x04, 0x00, 0x01, 0x06, 0x05, 0x03, 0x01,
+ 0x00, 0x00, 0x00, 0x02, 0x06, 0x05, 0x03, 0x02, 0x00, 0x00, 0x00
+ ];
+ let (base, _read_bytes) =
+ BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail");
+ println!("{base:?}");
+
+ // BroadcastAudioSourceEndpoint { presentation_delay_ms: 20000, big:
+ // [BroadcastIsochronousGroup { codec_id: Assigned(Lc3), codec_specific_config:
+ // [SamplingFrequency(F24000Hz), FrameDuration(TenMs),
+ // AudioChannelAllocation({FrontLeft}), OctetsPerCodecFrame(60)],
+ // metadata: [BroadcastAudioImmediateRenderingFlag], bis:
+ // [BroadcastIsochronousStream { bis_index: 1, codec_specific_config:
+ // [SamplingFrequency(F24000Hz)] }] }] }
+ #[rustfmt::skip]
+ let buf = [
+ 0x20, 0x4e, 0x00, 0x01, 0x01, 0x06, 0x00, 0x00, 0x00, 0x00, 0x10, 0x02, 0x01, 0x05,
+ 0x02, 0x02, 0x01, 0x05, 0x03, 0x01, 0x00, 0x00, 0x00, 0x03, 0x04, 0x3c, 0x00, 0x02,
+ 0x01, 0x09, 0x01, 0x03, 0x02, 0x01, 0x05,
+ ];
+ let (base, _read_bytes) =
+ BroadcastAudioSourceEndpoint::decode(&buf[..]).expect("should not fail");
+ println!("{base:?}");
+ }
+}
diff --git a/rust/bt-common/src/core.rs b/rust/bt-common/src/core.rs
index fd02299..9ee3fa0 100644
--- a/rust/bt-common/src/core.rs
+++ b/rust/bt-common/src/core.rs
@@ -7,8 +7,36 @@
use crate::packet_encoding::{Encodable, Error as PacketError};
-/// Represents the Advertising Set ID which is 1 byte long.
-/// Follows little endian encoding.
+/// Bluetooth Device Address that uniquely identifies the device
+/// to another Bluetooth device.
+/// See Core spec v5.3 Vol 2, Part B section 1.2.
+pub type Address = [u8; 6];
+
+/// See Core spec v5.3 Vol 3, Part C section 15.1.1.
+#[repr(u8)]
+#[derive(Clone, Copy, Debug, PartialEq)]
+pub enum AddressType {
+ Public = 0x00,
+ Random = 0x01,
+}
+
+impl AddressType {
+ pub const BYTE_SIZE: usize = 1;
+}
+
+impl TryFrom<u8> for AddressType {
+ type Error = PacketError;
+
+ fn try_from(value: u8) -> Result<Self, Self::Error> {
+ match value {
+ 0x00 => Ok(Self::Public),
+ 0x01 => Ok(Self::Random),
+ _ => Err(PacketError::OutOfRange),
+ }
+ }
+}
+
+/// Advertising Set ID which is 1 byte long.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct AdvertisingSetId(pub u8);
@@ -17,8 +45,7 @@
pub const BYTE_SIZE: usize = 1;
}
-/// Represents the SyncInfo Interval value which is 2 bytes long.
-/// Follows little endian encoding.
+/// SyncInfo Interval value which is 2 bytes long.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct PaInterval(pub u16);
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index 879a0fe..7a3578f 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -12,7 +12,7 @@
use bt_common::{PeerId, Uuid};
-use crate::central::{AdvertisingDatum, PeerName, ScanResult};
+use crate::central::ScanResult;
use crate::client::CharacteristicNotification;
use crate::{types::*, GattTypes};
@@ -142,47 +142,97 @@
}
}
-pub struct FakeServiceHandle {}
+#[derive(Clone)]
+pub struct FakeServiceHandle {
+ pub uuid: Uuid,
+ pub is_primary: bool,
+ pub fake_service: FakePeerService,
+}
impl crate::client::PeerServiceHandle<FakeTypes> for FakeServiceHandle {
fn uuid(&self) -> Uuid {
- todo!()
+ self.uuid
}
fn is_primary(&self) -> bool {
- todo!()
+ self.is_primary
}
fn connect(&self) -> <FakeTypes as GattTypes>::ServiceConnectFut {
- futures::future::ready(Ok(FakePeerService::new()))
+ futures::future::ready(Ok(self.fake_service.clone()))
}
}
-pub struct FakeClient {}
+#[derive(Default)]
+struct FakeClientInner {
+ fake_services: Vec<FakeServiceHandle>,
+}
+
+#[derive(Clone)]
+pub struct FakeClient {
+ inner: Arc<Mutex<FakeClientInner>>,
+}
+
+impl FakeClient {
+ pub fn new() -> Self {
+ FakeClient { inner: Arc::new(Mutex::new(FakeClientInner::default())) }
+ }
+
+ /// Add a fake peer service to this client.
+ pub fn add_service(&mut self, uuid: Uuid, is_primary: bool, fake_service: FakePeerService) {
+ self.inner.lock().fake_services.push(FakeServiceHandle { uuid, is_primary, fake_service });
+ }
+}
impl crate::Client<FakeTypes> for FakeClient {
fn peer_id(&self) -> PeerId {
todo!()
}
- fn find_service(&self, _uuid: Uuid) -> <FakeTypes as GattTypes>::FindServicesFut {
- futures::future::ready(Ok::<Vec<FakeServiceHandle>, Error>(vec![FakeServiceHandle {}]))
+ fn find_service(&self, uuid: Uuid) -> <FakeTypes as GattTypes>::FindServicesFut {
+ let fake_services = &self.inner.lock().fake_services;
+ let mut filtered_services = Vec::new();
+ for handle in fake_services {
+ if handle.uuid == uuid {
+ filtered_services.push(handle.clone());
+ }
+ }
+
+ futures::future::ready(Ok(filtered_services))
}
}
-pub struct SingleResultStream {
- result: Option<Result<crate::central::ScanResult>>,
+#[derive(Clone)]
+pub struct ScannedResultStream {
+ inner: Arc<Mutex<ScannedResultStreamInner>>,
}
-impl Stream for SingleResultStream {
+#[derive(Default)]
+pub struct ScannedResultStreamInner {
+ result: Option<Result<ScanResult>>,
+}
+
+impl ScannedResultStream {
+ pub fn new() -> Self {
+ Self { inner: Arc::new(Mutex::new(ScannedResultStreamInner::default())) }
+ }
+
+ /// Set scanned result item to output from the stream.
+ pub fn set_scanned_result(&mut self, item: Result<ScanResult>) {
+ self.inner.lock().result = Some(item);
+ }
+}
+
+impl Stream for ScannedResultStream {
type Item = Result<ScanResult>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
- if self.result.is_some() {
- Poll::Ready(self.get_mut().result.take())
+ let mut lock = self.inner.lock();
+ if lock.result.is_some() {
+ Poll::Ready(lock.result.take())
} else {
// Never wake up, as if we never find another result
Poll::Pending
@@ -194,7 +244,7 @@
impl GattTypes for FakeTypes {
type Central = FakeCentral;
- type ScanResultStream = SingleResultStream;
+ type ScanResultStream = ScannedResultStream;
type Client = FakeClient;
type ConnectFuture = Ready<Result<FakeClient>>;
type PeerServiceHandle = FakeServiceHandle;
@@ -208,21 +258,36 @@
}
#[derive(Default)]
-pub struct FakeCentral {}
+pub struct FakeCentralInner {
+ clients: HashMap<PeerId, FakeClient>,
+}
-impl crate::Central<FakeTypes> for FakeCentral {
- fn scan(&self, _filters: &[crate::central::ScanFilter]) -> SingleResultStream {
- SingleResultStream {
- result: Some(Ok(ScanResult {
- id: PeerId(1),
- connectable: true,
- name: PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
- advertised: vec![AdvertisingDatum::Services(vec![Uuid::from_u16(0x1844)])],
- })),
- }
+#[derive(Clone)]
+pub struct FakeCentral {
+ inner: Arc<Mutex<FakeCentralInner>>,
+}
+
+impl FakeCentral {
+ pub fn new() -> Self {
+ Self { inner: Arc::new(Mutex::new(FakeCentralInner::default())) }
}
- fn connect(&self, _peer_id: PeerId) -> <FakeTypes as GattTypes>::ConnectFuture {
- futures::future::ready(Ok(FakeClient {}))
+ pub fn add_client(&mut self, peer_id: PeerId, client: FakeClient) {
+ let _ = self.inner.lock().clients.insert(peer_id, client);
+ }
+}
+
+impl crate::Central<FakeTypes> for FakeCentral {
+ fn scan(&self, _filters: &[crate::central::ScanFilter]) -> ScannedResultStream {
+ ScannedResultStream::new()
+ }
+
+ fn connect(&self, peer_id: PeerId) -> <FakeTypes as GattTypes>::ConnectFuture {
+ let clients = &self.inner.lock().clients;
+ let res = match clients.get(&peer_id) {
+ Some(client) => Ok(client.clone()),
+ None => Err(Error::PeerDisconnected(peer_id)),
+ };
+ futures::future::ready(res)
}
}
diff --git a/rust/bt-gatt/src/tests.rs b/rust/bt-gatt/src/tests.rs
index d300f0f..e753706 100644
--- a/rust/bt-gatt/src/tests.rs
+++ b/rust/bt-gatt/src/tests.rs
@@ -9,6 +9,7 @@
use bt_common::{PeerId, Uuid};
+use crate::central::{AdvertisingDatum, PeerName, ScanResult};
use crate::test_utils::*;
use crate::types::*;
use crate::{central::Filter, client::PeerService, Central};
@@ -18,7 +19,7 @@
const TEST_UUID_3: Uuid = Uuid::from_u16(0x3456);
// Sets up a fake peer service with some characteristics.
-fn set_up() -> FakePeerService {
+fn setup_peer_service() -> FakePeerService {
let mut fake_peer_service = FakePeerService::new();
fake_peer_service.add_characteristic(
Characteristic {
@@ -86,7 +87,7 @@
fn peer_service_discover_characteristics_works() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let fake_peer_service = set_up();
+ let fake_peer_service = setup_peer_service();
let mut discover_results = fake_peer_service.discover_characteristics(Some(TEST_UUID_1));
let polled = discover_results.poll_unpin(&mut noop_cx);
@@ -110,7 +111,7 @@
fn peer_service_read_characteristic() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let mut fake_peer_service = set_up();
+ let mut fake_peer_service = setup_peer_service();
let mut buf = vec![0; 255];
// For characteristic that was added, value is returned
@@ -152,7 +153,7 @@
fn peer_service_write_characteristic() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let mut fake_peer_service: FakePeerService = set_up();
+ let mut fake_peer_service: FakePeerService = setup_peer_service();
// Set expected characteristic value before calling `write_characteristic`.
fake_peer_service.expect_characteristic_value(&Handle(1), vec![0, 1, 2, 3]);
@@ -174,7 +175,7 @@
fn peer_service_write_characteristic_fail() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let fake_peer_service = set_up();
+ let fake_peer_service = setup_peer_service();
// Write some random value without calling `expect_characteristic_value` first.
let mut write_result = fake_peer_service.write_characteristic(
@@ -194,7 +195,7 @@
fn peer_service_subsribe() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let mut fake_peer_service = set_up();
+ let mut fake_peer_service = setup_peer_service();
let mut notification_stream = fake_peer_service.subscribe(&Handle(0x1));
// Stream is empty unless we add an item through the FakeNotificationStream
@@ -230,9 +231,19 @@
#[test]
fn central_search_works() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
- let central = FakeCentral::default();
+ let central = FakeCentral::new();
let mut scan_results = central.scan(&[Filter::ServiceUuid(Uuid::from_u16(0x1844)).into()]);
+ let polled = scan_results.poll_next_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Pending);
+
+ let scanned_result = ScanResult {
+ id: PeerId(1),
+ connectable: true,
+ name: PeerName::CompleteName("Marie's Pixel 7 Pro".to_owned()),
+ advertised: vec![AdvertisingDatum::Services(vec![Uuid::from_u16(0x1844)])],
+ };
+ let _ = scan_results.set_scanned_result(Ok(scanned_result));
let polled = scan_results.poll_next_unpin(&mut noop_cx);
assert_matches!(polled, Poll::Ready(Some(Ok(_))));
@@ -243,8 +254,8 @@
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
let _stream = central.scan(&[]);
- let connect_fut = central.connect(PeerId(1));
+ let connect_fut = central.connect(PeerId(1));
futures::pin_mut!(connect_fut);
let Poll::Ready(Ok(client)) = connect_fut.poll(&mut noop_cx) else {
panic!("Connect should be ready Ok");
@@ -273,7 +284,12 @@
#[test]
fn central_dynamic_usage() {
- let central: Box<dyn crate::Central<FakeTypes>> = Box::new(FakeCentral::default());
+ let mut central = FakeCentral::new();
+ let mut fake_client = FakeClient::new();
+ fake_client.add_service(Uuid::from_u16(0), true, FakePeerService::new());
+ central.add_client(PeerId(1), fake_client);
- boxed_generic_usage(central);
+ let boxed: Box<dyn crate::Central<FakeTypes>> = Box::new(central);
+
+ boxed_generic_usage(boxed);
}