blob: 3fd08cb999f5ad372a73935520ea7b2e43a393b0 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use bt_bap::types::BroadcastId;
use bt_bass::client::error::Error as BassClientError;
use bt_bass::client::BroadcastAudioScanServiceClient;
use bt_common::{PeerId, Uuid};
use bt_gatt::central::*;
use bt_gatt::client::PeerServiceHandle;
use bt_gatt::types::Error as GattError;
use bt_gatt::Client;
pub mod event;
use event::*;
pub mod peer;
pub use peer::Peer;
use crate::types::*;
pub const BROADCAST_AUDIO_SCAN_SERVICE: Uuid = Uuid::from_u16(0x184F);
pub const BASIC_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1851);
pub const BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1852);
#[derive(Debug, Error)]
pub enum Error {
#[error("GATT operation error: {0:?}")]
Gatt(#[from] GattError),
#[error("Broadcast Audio Scan Service client error at peer ({0}): {1:?}")]
BassClient(PeerId, BassClientError),
#[error("Not connected to Broadcast Audio Scan Service at peer ({0})")]
NotConnectedToBass(PeerId),
#[error("Central scanning terminated unexpectedly")]
CentralScanTerminated,
#[error("Failed to connect to service ({1}) at peer ({0})")]
ConnectionFailure(PeerId, Uuid),
#[error("Broadcast Assistant was already started previously. It cannot be started twice")]
AlreadyStarted,
#[error("Failed due to error: {0}")]
Generic(String),
}
/// Contains information about the currently-known broadcast
/// sources and the peers they were found on
#[derive(Debug)]
pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<PeerId, BroadcastSource>>);
impl DiscoveredBroadcastSources {
/// Creates a shareable instance of `DiscoveredBroadcastSources`.
pub fn new() -> Arc<Self> {
Arc::new(Self(Mutex::new(HashMap::new())))
}
/// Merges the broadcast source data with existing broadcast source data.
/// Returns the copy of the broadcast source data after the merge and
/// indicates whether it has changed from before or not.
pub(crate) fn merge_broadcast_source_data(
&self,
peer_id: &PeerId,
data: &BroadcastSource,
) -> (BroadcastSource, bool) {
let mut lock = self.0.lock();
let source = lock.entry(*peer_id).or_default();
let before = source.clone();
source.merge(data);
let after = source.clone();
let changed = before != after;
(after, changed)
}
/// Get a BroadcastSource from a peer id.
fn get_by_peer_id(&self, peer_id: &PeerId) -> Option<BroadcastSource> {
let lock = self.0.lock();
lock.get(&peer_id).clone().map(|source| source.clone())
}
/// Get a BroadcastSource from associated broadcast id.
fn get_by_broadcast_id(&self, broadcast_id: &BroadcastId) -> Option<BroadcastSource> {
let lock = self.0.lock();
let info = lock.iter().find(|(&_k, &ref v)| v.broadcast_id == Some(*broadcast_id));
match info {
Some((&_peer_id, &ref broadcast_source)) => Some(broadcast_source.clone()),
None => None,
}
}
}
pub struct BroadcastAssistant<T: bt_gatt::GattTypes> {
central: T::Central,
broadcast_sources: Arc<DiscoveredBroadcastSources>,
scan_stream: Option<T::ScanResultStream>,
}
impl<T: bt_gatt::GattTypes + 'static> BroadcastAssistant<T> {
// Creates a broadcast assistant and sets it up to be ready
// for broadcast source scanning. Clients must use the `start`
// method to poll the event stream for scan results.
pub fn new(central: T::Central) -> Self {
let scan_result_stream = central.scan(&Self::scan_filters());
Self {
central,
broadcast_sources: DiscoveredBroadcastSources::new(),
scan_stream: Some(scan_result_stream),
}
}
/// List of scan filters for advertisement data Broadcast Assistant should
/// look for, which are:
/// - Service data with Broadcast Audio Announcement Service UUID from
/// Broadcast Sources (see BAP spec v1.0.1 Section 3.7.2.1 for details)
// TODO(b/308481381): define filter for finding broadcast sink.
fn scan_filters() -> Vec<ScanFilter> {
vec![Filter::HasServiceData(BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE).into()]
}
/// Start broadcast assistant. Returns EventStream that the upper layer can
/// poll. Upper layer can call methods on BroadcastAssistant based on the
/// events it sees.
pub fn start(&mut self) -> Result<EventStream<T>, Error> {
if self.scan_stream.is_none() {
return Err(Error::AlreadyStarted);
}
Ok(EventStream::<T>::new(self.scan_stream.take().unwrap(), self.broadcast_sources.clone()))
}
pub fn scan_for_scan_delegators(&mut self) -> T::ScanResultStream {
// Scan for service data with Broadcast Audio Scan Service UUID to look
// for Broadcast Sink collocated with the Scan Delegator (see BAP spec v1.0.1
// Section 3.9.2 for details).
self.central.scan(&vec![Filter::HasServiceData(BROADCAST_AUDIO_SCAN_SERVICE).into()])
}
pub async fn connect_to_scan_delegator(&self, peer_id: PeerId) -> Result<Peer<T>, Error>
where
<T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
{
let client = self.central.connect(peer_id).await?;
let service_handles = client.find_service(BROADCAST_AUDIO_SCAN_SERVICE).await?;
for handle in service_handles {
if handle.uuid() != BROADCAST_AUDIO_SCAN_SERVICE || !handle.is_primary() {
continue;
}
let service = handle.connect().await?;
let bass = BroadcastAudioScanServiceClient::<T>::create(service)
.await
.map_err(|e| Error::BassClient(peer_id, e))?;
let connected_peer =
Peer::<T>::new(peer_id, client, bass, self.broadcast_sources.clone());
return Ok(connected_peer);
}
Err(Error::ConnectionFailure(peer_id, BROADCAST_AUDIO_SCAN_SERVICE))
}
// Manually adds broadcast source information for debugging purposes.
#[cfg(any(test, feature = "debug"))]
pub fn force_discover_broadcast_source(
&self,
peer_id: PeerId,
address: [u8; 6],
raw_address_type: u8,
raw_advertising_sid: u8,
) -> Result<BroadcastSource, Error> {
use bt_common::core::{AddressType, AdvertisingSetId};
let broadcast_source = BroadcastSource {
address: Some(address),
address_type: Some(
AddressType::try_from(raw_address_type)
.map_err(|e| Error::Generic(e.to_string()))?,
),
advertising_sid: Some(AdvertisingSetId(raw_advertising_sid)),
broadcast_id: None,
pa_interval: None,
endpoint: None,
};
Ok(self.broadcast_sources.merge_broadcast_source_data(&peer_id, &broadcast_source).0)
}
// Manually adds broadcast source information for debugging purposes.
#[cfg(any(test, feature = "debug"))]
pub fn force_discover_broadcast_source_metadata(
&self,
peer_id: PeerId,
raw_metadata: Vec<Vec<u8>>,
) -> Result<BroadcastSource, Error> {
use bt_bap::types::{BroadcastAudioSourceEndpoint, BroadcastIsochronousGroup};
use bt_common::core::ltv::LtValue;
use bt_common::core::CodecId;
use bt_common::generic_audio::metadata_ltv::Metadata;
let mut big = Vec::new();
for bytes in raw_metadata {
let metadata = {
if bytes.len() > 0 {
let (decoded_metadata, consumed_len) = Metadata::decode_all(bytes.as_slice());
if consumed_len != bytes.len() {
return Err(Error::Generic("Metadata length is not valid".to_string()));
}
decoded_metadata.into_iter().filter_map(Result::ok).collect()
} else {
vec![]
}
};
let group = BroadcastIsochronousGroup {
codec_id: CodecId::Assigned(bt_common::core::CodingFormat::ALawLog), // mock.
codec_specific_configs: vec![],
metadata,
bis: vec![],
};
big.push(group);
}
let endpoint = BroadcastAudioSourceEndpoint { presentation_delay_ms: 0, big };
let broadcast_source = BroadcastSource {
address: None,
address_type: None,
advertising_sid: None,
broadcast_id: None,
pa_interval: None,
endpoint: Some(endpoint),
};
Ok(self.broadcast_sources.merge_broadcast_source_data(&peer_id, &broadcast_source).0)
}
// Gets the broadcast sources currently known by the broadcast
// assistant.
pub fn known_broadcast_sources(&self) -> std::collections::HashMap<PeerId, BroadcastSource> {
let lock = self.broadcast_sources.0.lock();
let mut m = HashMap::new();
for (pid, source) in lock.iter() {
m.insert(*pid, source.clone());
}
m
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{pin_mut, FutureExt};
use std::task::Poll;
use bt_bap::types::*;
use bt_common::core::{AddressType, AdvertisingSetId};
use bt_gatt::test_utils::{FakeCentral, FakeClient, FakeTypes};
use crate::assistant::peer::tests::fake_bass_service;
#[test]
fn merge_broadcast_source() {
let discovered = DiscoveredBroadcastSources::new();
let bid = BroadcastId::try_from(1001).unwrap();
let (bs, changed) = discovered.merge_broadcast_source_data(
&PeerId(1001),
&BroadcastSource::default()
.with_address([1, 2, 3, 4, 5, 6])
.with_address_type(AddressType::Public)
.with_advertising_sid(AdvertisingSetId(1))
.with_broadcast_id(bid),
);
assert!(changed);
assert_eq!(
bs,
BroadcastSource {
address: Some([1, 2, 3, 4, 5, 6]),
address_type: Some(AddressType::Public),
advertising_sid: Some(AdvertisingSetId(1)),
broadcast_id: Some(bid),
pa_interval: None,
endpoint: None,
}
);
let (bs, changed) = discovered.merge_broadcast_source_data(
&PeerId(1001),
&BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
),
);
assert!(changed);
assert_eq!(
bs,
BroadcastSource {
address: Some([1, 2, 3, 4, 5, 6]),
address_type: Some(AddressType::Random),
advertising_sid: Some(AdvertisingSetId(1)),
broadcast_id: Some(bid),
pa_interval: None,
endpoint: Some(BroadcastAudioSourceEndpoint {
presentation_delay_ms: 32,
big: vec![]
}),
}
);
let (_, changed) = discovered.merge_broadcast_source_data(
&PeerId(1001),
&BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
),
);
assert!(!changed);
}
#[test]
fn start_stream() {
let mut assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new());
let _ = assistant.start().expect("can start stream");
// Stream can only be started once.
assert!(assistant.start().is_err());
}
#[test]
fn connect_to_scan_delegator() {
// Set up fake GATT related objects.
let mut central = FakeCentral::new();
let mut client = FakeClient::new();
central.add_client(PeerId(1004), client.clone());
let service = fake_bass_service();
client.add_service(BROADCAST_AUDIO_SCAN_SERVICE, true, service.clone());
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
let assistant = BroadcastAssistant::<FakeTypes>::new(central);
let conn_fut = assistant.connect_to_scan_delegator(PeerId(1004));
pin_mut!(conn_fut);
let polled = conn_fut.poll_unpin(&mut noop_cx);
let Poll::Ready(res) = polled else {
panic!("should be ready");
};
let _ = res.expect("should be ok");
}
}