blob: c3bfa1a45f2b40c5d60991dc269481cb4e46a6d5 [file] [log] [blame]
// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Implements the Published Audio Capabilities Service server role.
//!
//! Use the `ServerBuilder` to define a new `Server` instance with the specified
//! characteristics. The server isn't published to GATT until
//! `Server::publish` method is called.
//! Once the `Server` is published, poll on it to receive events from the
//! `Server`, which are created as it processes incoming client requests.
//!
//! For example:
//!
//! // Set up a GATT Server which implements `bt_gatt::ServerTypes::Server`.
//! let gatt_server = ...;
//! // Define supported and available audio contexts for this PACS.
//! let supported = AudioContexts::new(...);
//! let available = AudioContexts::new(...);
//! let pacs_server = ServerBuilder::new()
//! .with_sources(...)
//! .with_sinks(...)
//! .build(supported, available)?;
//!
//! // Publish the server.
//! pacs_server.publish(gatt_server).expect("publishes fine");
//! // Process events from the PACS server.
//! while let Some(event) = pacs_server.next().await {
//! // Do something with `event`
//! }
use bt_common::generic_audio::ContextType;
use bt_gatt::server::LocalService;
use bt_gatt::server::{ReadResponder, ServiceDefinition, WriteResponder};
use bt_gatt::types::{GattError, Handle};
use bt_gatt::Server as _;
use futures::task::{Poll, Waker};
use futures::{Future, Stream};
use pin_project::pin_project;
use std::collections::HashMap;
use thiserror::Error;
use crate::{
AudioLocations, AvailableAudioContexts, PacRecord, SinkAudioLocations, SourceAudioLocations,
SupportedAudioContexts,
};
pub(crate) mod types;
use crate::server::types::*;
#[pin_project(project = LocalServiceProj)]
enum LocalServiceState<T: bt_gatt::ServerTypes> {
NotPublished {
waker: Option<Waker>,
},
Preparing {
#[pin]
fut: T::LocalServiceFut,
},
Published {
service: T::LocalService,
#[pin]
events: T::ServiceEventStream,
},
Terminated,
}
impl<T: bt_gatt::ServerTypes> Default for LocalServiceState<T> {
fn default() -> Self {
Self::NotPublished { waker: None }
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("Service is already published")]
AlreadyPublished,
#[error("Issue publishing service: {0}")]
PublishError(#[from] bt_gatt::types::Error),
#[error("Service should support at least one of Sink or Source PAC characteristics")]
MissingPac,
#[error("Available audio contexts are not supported: {0:?}")]
UnsupportedAudioContexts(Vec<ContextType>),
}
impl<T: bt_gatt::ServerTypes> Stream for LocalServiceState<T> {
type Item = Result<bt_gatt::server::ServiceEvent<T>, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
// SAFETY:
// - Wakers are Unpin
// - We re-pin the structurally pinned futures in Preparing and Published
// (service is untouched)
// - Terminated is empty
loop {
match self.as_mut().project() {
LocalServiceProj::Terminated => return Poll::Ready(None),
LocalServiceProj::NotPublished { .. } => {
self.as_mut()
.set(LocalServiceState::NotPublished { waker: Some(cx.waker().clone()) });
return Poll::Pending;
}
LocalServiceProj::Preparing { fut } => {
let service_result = futures::ready!(fut.poll(cx));
let Ok(service) = service_result else {
return Poll::Ready(Some(Err(Error::PublishError(
service_result.err().unwrap(),
))));
};
let events = service.publish();
self.as_mut().set(LocalServiceState::Published { service, events });
continue;
}
LocalServiceProj::Published { service: _, events } => {
let item = futures::ready!(events.poll_next(cx));
let Some(gatt_result) = item else {
self.as_mut().set(LocalServiceState::Terminated);
return Poll::Ready(None);
};
let Ok(event) = gatt_result else {
self.as_mut().set(LocalServiceState::Terminated);
return Poll::Ready(Some(Err(Error::PublishError(
gatt_result.err().unwrap(),
))));
};
return Poll::Ready(Some(Ok(event)));
}
}
}
}
}
impl<T: bt_gatt::ServerTypes> LocalServiceState<T> {
fn is_published(&self) -> bool {
if let LocalServiceState::NotPublished { .. } = self {
false
} else {
true
}
}
}
#[derive(Default)]
pub struct ServerBuilder {
source_pacs: Vec<Vec<PacRecord>>,
source_audio_locations: Option<AudioLocations>,
sink_pacs: Vec<Vec<PacRecord>>,
sink_audio_locations: Option<AudioLocations>,
}
impl ServerBuilder {
pub fn new() -> ServerBuilder {
ServerBuilder::default()
}
/// Adds a source PAC characteristic to the builder.
/// Each call adds a new characteristic.
/// `capabilities` represents the records for a single PAC characteristic.
/// If `capabilities` is empty, it will be ignored.
pub fn add_source(mut self, capabilities: Vec<PacRecord>) -> Self {
if !capabilities.is_empty() {
self.source_pacs.push(capabilities);
}
self
}
/// Sets the audio locations for the source.
/// This corresponds to a single Source Audio Locations characteristic.
pub fn set_source_locations(mut self, audio_locations: AudioLocations) -> Self {
self.source_audio_locations = Some(audio_locations);
self
}
/// Adds a sink PAC characteristic to the builder.
/// Each call adds a new characteristic.
/// `capabilities` represents the records for a single PAC characteristic.
/// If `capabilities` is empty, it will be ignored.
pub fn add_sink(mut self, capabilities: Vec<PacRecord>) -> Self {
if !capabilities.is_empty() {
self.sink_pacs.push(capabilities);
}
self
}
/// Sets the audio locations for the sink.
/// This corresponds to a single Sink Audio Locations characteristic.
pub fn set_sink_locations(mut self, audio_locations: AudioLocations) -> Self {
self.sink_audio_locations = Some(audio_locations);
self
}
fn verify_characteristics(
&self,
supported: &AudioContexts,
available: &AudioContexts,
) -> Result<(), Error> {
// If the corresponding bit in the supported audio contexts is
// not set to 0b1, we shall not set a bit to 0b1 in the
// available audio contexts. See PACS v1.0.1 section 3.5.1.
let diff: Vec<ContextType> = available.sink.difference(&supported.sink).cloned().collect();
if diff.len() != 0 {
return Err(Error::UnsupportedAudioContexts(diff));
}
let diff: Vec<ContextType> =
available.source.difference(&supported.source).cloned().collect();
if diff.len() != 0 {
return Err(Error::UnsupportedAudioContexts(diff));
}
// PACS server must have at least one Sink or Source PACS record.
if self.source_pacs.len() == 0 && self.sink_pacs.len() == 0 {
return Err(Error::MissingPac);
}
Ok(())
}
/// Builds a server after verifying all the defined characteristics
/// for this server (see PACS v1.0.1 section 3 for details).
pub fn build<T>(
mut self,
mut supported: AudioContexts,
available: AudioContexts,
) -> Result<Server<T>, Error>
where
T: bt_gatt::ServerTypes,
{
let _ = self.verify_characteristics(&supported, &available)?;
let mut service_def = ServiceDefinition::new(
bt_gatt::server::ServiceId::new(1),
crate::PACS_UUID,
bt_gatt::types::ServiceKind::Primary,
);
let supported = SupportedAudioContexts {
handle: SUPPORTED_AUDIO_CONTEXTS_HANDLE,
sink: supported.sink.drain().collect(),
source: supported.source.drain().collect(),
};
let _ = service_def.add_characteristic((&supported).into());
let available = AvailableAudioContexts {
handle: AVAILABLE_AUDIO_CONTEXTS_HANDLE,
sink: (&available.sink).into(),
source: (&available.source).into(),
};
let _ = service_def.add_characteristic((&available).into());
let mut next_handle_iter = (HANDLE_OFFSET..).map(|x| Handle(x));
let mut audio_capabilities = HashMap::new();
// Sink audio locations characteristic may exist iff it's defined
// and there are valid sink PAC characteristics.
let sink_audio_locations = match self.sink_audio_locations.take() {
Some(locations) if self.sink_pacs.len() > 0 => {
let sink =
SinkAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
let _ = service_def.add_characteristic((&sink).into());
Some(sink)
}
_ => None,
};
for capabilities in self.sink_pacs.drain(..) {
let handle = next_handle_iter.next().unwrap();
let pac = PublishedAudioCapability::new_sink(handle, capabilities);
let _ = service_def.add_characteristic((&pac).into());
audio_capabilities.insert(handle, pac);
}
// Source audio locations characteristic may exist iff it's defined
// and there are valid source PAC characteristics.
let source_audio_locations = match self.source_audio_locations.take() {
Some(locations) if self.source_pacs.len() > 0 => {
let source =
SourceAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
let _ = service_def.add_characteristic((&source).into());
Some(source)
}
_ => None,
};
for capabilities in self.source_pacs.drain(..) {
let handle = next_handle_iter.next().unwrap();
let pac = PublishedAudioCapability::new_source(handle, capabilities);
let _ = service_def.add_characteristic((&pac).into());
audio_capabilities.insert(handle, pac);
}
let server = Server {
service_def,
local_service: Default::default(),
published_audio_capabilities: audio_capabilities,
source_audio_locations,
sink_audio_locations,
available_audio_contexts: available,
supported_audio_contexts: supported,
};
Ok(server)
}
}
#[pin_project]
pub struct Server<T: bt_gatt::ServerTypes> {
service_def: ServiceDefinition,
#[pin]
local_service: LocalServiceState<T>,
published_audio_capabilities: HashMap<Handle, PublishedAudioCapability>,
source_audio_locations: Option<SourceAudioLocations>,
sink_audio_locations: Option<SinkAudioLocations>,
available_audio_contexts: AvailableAudioContexts,
supported_audio_contexts: SupportedAudioContexts,
}
impl<T: bt_gatt::ServerTypes> Server<T> {
pub fn publish(&mut self, server: T::Server) -> Result<(), Error> {
if self.local_service.is_published() {
return Err(Error::AlreadyPublished);
}
let LocalServiceState::NotPublished { waker } = std::mem::replace(
&mut self.local_service,
LocalServiceState::Preparing { fut: server.prepare(self.service_def.clone()) },
) else {
unreachable!();
};
waker.map(Waker::wake);
Ok(())
}
fn is_source_locations_handle(&self, handle: Handle) -> bool {
self.source_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
}
fn is_sink_locations_handle(&self, handle: Handle) -> bool {
self.sink_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
}
}
impl<T: bt_gatt::ServerTypes> Stream for Server<T> {
type Item = Result<(), Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
let mut this = self.as_mut().project();
let gatt_event = match futures::ready!(this.local_service.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
Some(Ok(event)) => event,
};
use bt_gatt::server::ServiceEvent::*;
match gatt_event {
Read { handle, offset, responder, .. } => {
let offset = offset as usize;
let value = match handle {
x if x == AVAILABLE_AUDIO_CONTEXTS_HANDLE => {
self.available_audio_contexts.into_char_value()
}
x if x == SUPPORTED_AUDIO_CONTEXTS_HANDLE => {
self.supported_audio_contexts.into_char_value()
}
x if self.is_source_locations_handle(x) => {
self.source_audio_locations.as_ref().unwrap().into_char_value()
}
x if self.is_sink_locations_handle(x) => {
self.sink_audio_locations.as_ref().unwrap().into_char_value()
}
pac_handle => {
let Some(ref pac) = self.published_audio_capabilities.get(&pac_handle)
else {
responder.error(GattError::InvalidHandle);
continue;
};
pac.encode()
}
};
responder.respond(&value[offset..]);
continue;
}
// TODO(b/309015071): support optional writes.
Write { responder, .. } => {
responder.error(GattError::WriteNotPermitted);
continue;
}
// TODO(b/309015071): implement notify since it's mandatory.
ClientConfiguration { .. } => {
unimplemented!();
}
_ => continue,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bt_common::core::{CodecId, CodingFormat};
use bt_common::generic_audio::codec_capabilities::*;
use bt_common::generic_audio::AudioLocation;
use bt_common::PeerId;
use bt_gatt::server;
use bt_gatt::test_utils::{FakeServer, FakeServerEvent, FakeTypes};
use bt_gatt::types::ServiceKind;
use futures::{FutureExt, StreamExt};
use std::collections::HashSet;
use crate::AvailableContexts;
// Builder for a server with:
// - 1 sink and 1 source PAC characteristics
// - sink audio locations
fn default_server_builder() -> ServerBuilder {
let builder = ServerBuilder::new()
.add_sink(vec![PacRecord {
codec_id: CodecId::Assigned(CodingFormat::ALawLog),
codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
FrameDurationSupport::BothNoPreference,
)],
metadata: vec![],
}])
.set_sink_locations(AudioLocations {
locations: HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight]),
})
.add_source(vec![
PacRecord {
codec_id: CodecId::Assigned(CodingFormat::ALawLog),
codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
FrameDurationSupport::BothNoPreference,
)],
metadata: vec![],
},
PacRecord {
codec_id: CodecId::Assigned(CodingFormat::MuLawLog),
codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
FrameDurationSupport::BothNoPreference,
)],
metadata: vec![],
},
])
.add_source(vec![]);
builder
}
#[test]
fn build_server() {
let server = default_server_builder()
.build::<FakeTypes>(
AudioContexts::new(
HashSet::from([ContextType::Conversational, ContextType::Media]),
HashSet::from([ContextType::Media]),
),
AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
)
.expect("should succeed");
assert_eq!(server.published_audio_capabilities.len(), 2);
assert_eq!(server.supported_audio_contexts.handle.0, 1);
assert_eq!(
server.supported_audio_contexts.sink,
HashSet::from([ContextType::Conversational, ContextType::Media])
);
assert_eq!(server.supported_audio_contexts.source, HashSet::from([ContextType::Media]));
assert_eq!(server.available_audio_contexts.handle.0, 2);
assert_eq!(
server.available_audio_contexts.sink,
AvailableContexts::Available(HashSet::from([ContextType::Media]))
);
assert_eq!(server.available_audio_contexts.source, AvailableContexts::NotAvailable);
// Should have 1 sink PAC characteristic with audio locations.
let location_char = server.sink_audio_locations.as_ref().expect("should exist");
assert_eq!(location_char.handle.0, 3);
assert_eq!(
location_char.locations.locations,
HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight])
);
let mut sink_iter =
server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_sink());
let sink_char = sink_iter.next().expect("should exist");
assert_eq!(sink_char.0, &Handle(4));
assert_eq!(sink_char.1.pac_records().len(), 1);
assert!(sink_iter.next().is_none());
// Should have 1 source PAC characteristic w/o audio locations.
assert!(server.source_audio_locations.is_none());
let mut source_iter =
server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_source());
let source_char = source_iter.next().expect("should exist");
assert_eq!(source_char.0, &Handle(5));
assert_eq!(source_char.1.pac_records().len(), 2);
assert_eq!(source_iter.next(), None);
}
#[test]
fn build_server_error() {
// No sink or source PACs.
assert!(ServerBuilder::new()
.build::<FakeTypes>(
AudioContexts::new(
HashSet::from([ContextType::Conversational, ContextType::Media]),
HashSet::from([ContextType::Media]),
),
AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
)
.is_err());
// Sink audio context in available not in supported.
assert!(default_server_builder()
.build::<FakeTypes>(
AudioContexts::new(
HashSet::from([ContextType::Conversational, ContextType::Media]),
HashSet::from([ContextType::Media]),
),
AudioContexts::new(HashSet::from([ContextType::Alerts]), HashSet::new()),
)
.is_err());
// Sink audio context in available not in supported.
assert!(default_server_builder()
.build::<FakeTypes>(
AudioContexts::new(
HashSet::from([ContextType::Conversational, ContextType::Media]),
HashSet::from([ContextType::Media]),
),
AudioContexts::new(HashSet::from([]), HashSet::from([ContextType::EmergencyAlarm])),
)
.is_err());
}
#[test]
fn publish_server() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
let mut server = default_server_builder()
.build::<FakeTypes>(
AudioContexts::new(
HashSet::from([ContextType::Media]),
HashSet::from([ContextType::Media]),
),
AudioContexts::new(HashSet::new(), HashSet::new()),
)
.unwrap();
// Server should be pending still since GATT server not establihsed.
let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
panic!("Should be pending");
};
let (fake_gatt_server, mut event_receiver) = FakeServer::new();
// Event stream should be pending still since service not published.
let mut event_stream = event_receiver.next();
let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
panic!("Should be pending");
};
let _ = server.publish(fake_gatt_server).expect("should succeed");
// Server should poll on local server state.
let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
panic!("Should be pending");
};
// Should receive event that GATT service was published.
let Poll::Ready(Some(FakeServerEvent::Published { id, definition })) =
event_stream.poll_unpin(&mut noop_cx)
else {
panic!("Should be published");
};
assert_eq!(id, server::ServiceId::new(1));
assert_eq!(definition.characteristics().collect::<Vec<_>>().len(), 5);
assert_eq!(definition.kind(), ServiceKind::Primary);
assert_eq!(definition.uuid(), crate::PACS_UUID);
// Server can only be published once.
let (fake_gatt_server, _) = FakeServer::new();
assert!(server.publish(fake_gatt_server).is_err());
}
#[test]
fn read_from_server() {
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
let mut server = default_server_builder()
.build::<FakeTypes>(
AudioContexts::new(
HashSet::from([ContextType::Media]),
HashSet::from([ContextType::Media]),
),
AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
)
.unwrap();
let (fake_gatt_server, mut event_receiver) = FakeServer::new();
let _ = server.publish(fake_gatt_server.clone()).expect("should succeed");
// Server should poll on local server state.
let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
panic!("Should be pending");
};
// Should receive event that GATT service was published.
let mut event_stream = event_receiver.next();
let Poll::Ready(Some(FakeServerEvent::Published { id, .. })) =
event_stream.poll_unpin(&mut noop_cx)
else {
panic!("Should be published");
};
// Fake an incoming read from a remote peer.
let available_char_handle = server.available_audio_contexts.handle;
fake_gatt_server.incoming_read(PeerId(0x01), id, available_char_handle, 0);
// Server should still be pending.
let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
panic!("Should be pending");
};
// We should received read response.
let Poll::Ready(Some(FakeServerEvent::ReadResponded { handle, value, .. })) =
event_stream.poll_unpin(&mut noop_cx)
else {
panic!("Should be published");
};
assert_eq!(handle, available_char_handle);
assert_eq!(value.expect("should be ok"), vec![0x04, 0x00, 0x00, 0x00]);
}
}