rust/bt-ascs: Implement ConfigureCodec Implement ConfigureCodec and the machinery to handle ServiceEvents and responses: - Add AseControlOperationFut and a queue of futures to poll - Update AseControlOperation to always produce an operation from a ControlPoint write (add Error operation) - Generate Events from Operations initiated by a peer - Update AudioStreamEndpoints based on responses - Send notifications of Control Point and Endpoints when operation is complete Bug: b/309015034 Test: updated / enabled, cargo test Change-Id: I90c974d52229f180ee6489d328787f468c762acb Reviewed-on: https://bluetooth-review.googlesource.com/c/bluetooth/+/1740
diff --git a/rust/bt-ascs/src/server.rs b/rust/bt-ascs/src/server.rs index 1e8974b..49b1225 100644 --- a/rust/bt-ascs/src/server.rs +++ b/rust/bt-ascs/src/server.rs
@@ -2,18 +2,22 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +use bt_common::core::CodecId; +use bt_common::packet_encoding::Encodable; +use bt_common::{PeerId, Uuid}; +use bt_gatt::server::{LocalService, Server, ServiceDefinition, ServiceId}; use bt_gatt::server::{ReadResponder, WriteResponder}; use bt_gatt::types::{ AttributePermissions, CharacteristicProperty, GattError, Handle, SecurityLevels, }; use bt_gatt::Characteristic; -use futures::task::{Poll, Waker}; -use futures::{stream::FusedStream, Future, Stream}; -use pin_project::pin_project; -use std::collections::HashMap; -use bt_common::{PeerId, Uuid}; -use bt_gatt::server::{LocalService, Server, ServiceDefinition, ServiceId}; +use futures::channel::oneshot::{self, Canceled}; +use futures::stream::{FusedStream, FuturesUnordered}; +use futures::task::{Poll, Waker}; +use futures::{Future, Stream, StreamExt}; +use pin_project::pin_project; +use std::collections::{HashMap, VecDeque}; use crate::types::*; @@ -47,6 +51,14 @@ }; Some(service) } + + fn notify(&self, characteristic: &Handle, data: &[u8], peers: &[PeerId]) -> bool { + let Some(service) = self.service() else { + return false; + }; + service.notify(characteristic, data, peers); + true + } } impl<T: bt_gatt::ServerTypes> Stream for LocalServiceState<T> { @@ -117,6 +129,97 @@ } } +#[derive(Debug, Clone)] +struct AseControlOperationAction { + peer_id: PeerId, + opcode: Option<AseControlPointOpcode>, + response_codes: Vec<ResponseCode>, + new_endpoints: Vec<AudioStreamEndpoint>, +} + +impl AseControlOperationAction { + fn notify_control_point_value(&self) -> Option<Vec<u8>> { + if self.opcode.is_none() || self.response_codes.is_empty() { + return None; + } + let mut notification = Vec::with_capacity(2 + self.response_codes.len() * 3); + // Opcode and Number_of_ASEs + notification.push(self.opcode.unwrap().into()); + if self.response_codes[0].ase_id_value() == 0x00 { + // UnsupportedOpcode or InvalidLength. Number_of_ASEs shall be set to 0xFF + // See ASCS v1.0.1 Table 4.7. We only include the first response_code. + notification.push(0xFF); + notification.extend(self.response_codes[0].notify_value()); + return Some(notification); + } + notification.push(self.response_codes.len() as u8); + for response in &self.response_codes { + notification.extend(response.notify_value()); + } + Some(notification) + } +} + +struct AseControlOperationFut { + peer_id: PeerId, + /// The opcode of the operation that this future is for, if there is one. + opcode: Option<AseControlPointOpcode>, + /// The current set of response codes gathered for this operation. + /// Empty if there are no responses to send. + current_response_codes: Vec<ResponseCode>, + /// New Endpoint States after the operation is complete + endpoints: Vec<AudioStreamEndpoint>, + /// Queue of responses we are still waiting on from the operation. + waiting: FuturesUnordered< + futures::channel::oneshot::Receiver<Result<AudioStreamEndpoint, ResponseCode>>, + >, +} + +impl From<&AseControlOperationFut> for AseControlOperationAction { + fn from(value: &AseControlOperationFut) -> Self { + Self { + peer_id: value.peer_id, + opcode: value.opcode, + response_codes: value.current_response_codes.clone(), + new_endpoints: value.endpoints.clone(), + } + } +} + +impl Future for AseControlOperationFut { + type Output = AseControlOperationAction; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Self::Output> { + loop { + if self.waiting.is_terminated() { + let this = std::pin::Pin::into_inner(self); + return Poll::Ready((&*this).into()); + } + let update = futures::ready!(self.waiting.poll_next_unpin(cx)); + match update { + None => continue, + Some(Err(Canceled)) => { + log::warn!("Detected dropped responder!"); + // TODO: maybe figure out how to determine which AseId got canceled here + self.current_response_codes + .push(ResponseCode::UnspecifiedError { ase_id: AseId(0x00) }); + // Bail on the rest of them. + self.waiting.clear(); + } + Some(Ok(Ok(endpoint))) => { + self.current_response_codes + .push(ResponseCode::Success { ase_id: endpoint.ase_id }); + self.endpoints.push(endpoint); + } + Some(Ok(Err(response_code))) => self.current_response_codes.push(response_code), + } + } + } +} + #[pin_project] pub struct AudioStreamControlServiceServer<T: bt_gatt::ServerTypes> { service_def: ServiceDefinition, @@ -124,6 +227,10 @@ local_service: LocalServiceState<T>, default_client_endpoints: ClientEndpoints, client_endpoints: HashMap<PeerId, ClientEndpoints>, + outgoing_events: VecDeque<ServiceEvent>, + // TODO: maybe these should be FuturesOrdered if the operations should be FIFOed + // Not pinned as AseControlOperationFut is Unpin. + responses: futures::stream::FuturesUnordered<AseControlOperationFut>, } const CONTROL_POINT_HANDLE: Handle = Handle(1); @@ -154,6 +261,8 @@ local_service: Default::default(), default_client_endpoints, client_endpoints: Default::default(), + outgoing_events: VecDeque::new(), + responses: FuturesUnordered::new(), } } @@ -189,6 +298,51 @@ pub fn release(&mut self, _id: AseId) -> Result<(), Error> { unimplemented!() } + + fn queue_operation(self: std::pin::Pin<&mut Self>, peer_id: PeerId, op: AseControlOperation) { + let this = self.project(); + let (events, fut) = + op.apply(peer_id, this.client_endpoints.get(&peer_id).unwrap().endpoints.clone()); + this.outgoing_events.extend(events); + this.responses.push(fut); + } + + /// Applies operations that are ready to complete (all outstanding events + /// have been responded to) + fn poll_operations(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) { + let this = self.project(); + loop { + let Poll::Ready(Some(operation)) = this.responses.poll_next_unpin(cx) else { + return; + }; + if let Some(notification) = operation.notify_control_point_value() { + this.local_service.notify( + &CONTROL_POINT_HANDLE, + ¬ification, + &[operation.peer_id], + ); + } + if operation.new_endpoints.is_empty() { + return; + } + // Apply the endpoint changes, and notify each endpoint + let Some(current_endpoints) = this.client_endpoints.get_mut(&operation.peer_id) else { + log::warn!( + "PeerId {peer_id} has disppeared while an operation was happening, ignoring..", + peer_id = operation.peer_id + ); + continue; + }; + for endpoint in operation.new_endpoints { + this.local_service.notify( + &endpoint.handle, + endpoint.into_char_value().as_slice(), + &[operation.peer_id], + ); + let _ = current_endpoints.endpoints.insert(endpoint.ase_id, endpoint); + } + } + } } #[derive(Debug, Clone, Copy, PartialEq)] @@ -207,21 +361,73 @@ } #[derive(Debug, Clone)] +enum AseAdditionalParameters { + /// When in states with no additional parameters: Idle, Releasing + None, + /// When CodecConfigured + CodecConfigured { + framing: Framing, + preferred_phys: Vec<Phy>, + preferred_retransmission_number: u8, + max_transport_latency: MaxTransportLatency, + presentation_delay_range: PresentationDelayRange, + codec_id: CodecId, + codec_config: Vec<u8>, + }, +} + +impl AseAdditionalParameters { + fn char_size(&self) -> usize { + match self { + AseAdditionalParameters::None => 0, + AseAdditionalParameters::CodecConfigured { codec_config, .. } => { + 23 + codec_config.len() + } + } + } + fn into_char_value(&self) -> Vec<u8> { + match self { + AseAdditionalParameters::None => Vec::new(), + AseAdditionalParameters::CodecConfigured { + framing, + preferred_phys, + preferred_retransmission_number, + max_transport_latency, + presentation_delay_range, + codec_id, + codec_config, + } => { + let mut value = Vec::with_capacity(self.char_size()); + value.resize(self.char_size() - codec_config.len(), 0); + value[0] = (*framing) as u8; + value[1] = Phy::to_bits(preferred_phys.iter()); + value[2] = *preferred_retransmission_number; + max_transport_latency.encode(&mut value[3..]).unwrap(); + presentation_delay_range.encode(&mut value[5..]).unwrap(); + codec_id.encode(&mut value[17..]).unwrap(); + value[22] = codec_config.len() as u8; + value.extend(codec_config.clone()); + value + } + } + } +} + +#[derive(Debug, Clone)] struct AudioStreamEndpoint { handle: Handle, direction: AudioDirection, ase_id: AseId, state: AseState, - // TODO(b/433287917): Add Additional Parameters for other states. - // Currently only works for Idle and Releasing states. + additional: AseAdditionalParameters, } impl AudioStreamEndpoint { fn into_char_value(&self) -> Vec<u8> { - let mut value = Vec::with_capacity(2); + let mut value = Vec::with_capacity(2 + self.additional.char_size()); value.push(self.ase_id.into()); value.push(self.state.into()); - // TODO: add the additional_ase_parameters for the other states + value.extend(self.additional.into_char_value()); value } } @@ -260,7 +466,13 @@ ( ( ase_id, - AudioStreamEndpoint { handle, ase_id, direction, state: AseState::Idle }, + AudioStreamEndpoint { + handle, + ase_id, + direction, + state: AseState::Idle, + additional: AseAdditionalParameters::None, + }, ), (handle, ase_id), ) @@ -281,29 +493,134 @@ } } -pub enum ServiceEvent {} +#[derive(Debug)] +pub struct CodecConfigureResponder { + endpoint: AudioStreamEndpoint, + codec_id: CodecId, + codec_config: Vec<u8>, + sender: futures::channel::oneshot::Sender<Result<AudioStreamEndpoint, ResponseCode>>, +} + +impl CodecConfigureResponder { + pub fn reject(self, err: ResponseCode) { + let _ = self.sender.send(Err(err)); + } + + pub fn accept( + mut self, + framing: Framing, + preferred_phys: Vec<Phy>, + preferred_retransmission_number: u8, + max_transport_latency: MaxTransportLatency, + presentation_delay_range: PresentationDelayRange, + ) { + self.endpoint.state = AseState::CodecConfigured; + self.endpoint.additional = AseAdditionalParameters::CodecConfigured { + framing, + preferred_phys, + preferred_retransmission_number, + max_transport_latency, + presentation_delay_range, + codec_id: self.codec_id, + codec_config: self.codec_config, + }; + let _ = self.sender.send(Ok(self.endpoint)); + } +} + +#[derive(Debug)] +pub enum ServiceEvent { + CodecConfigure { configuration: CodecConfiguration, responder: CodecConfigureResponder }, +} + +impl CodecConfiguration { + fn into_event( + self, + endpoint: AudioStreamEndpoint, + sender: oneshot::Sender<Result<AudioStreamEndpoint, ResponseCode>>, + ) -> crate::server::ServiceEvent { + ServiceEvent::CodecConfigure { + configuration: self.clone(), + responder: crate::server::CodecConfigureResponder { + endpoint, + codec_id: self.codec_id, + codec_config: self.codec_specific_configuration.clone(), + sender, + }, + } + } +} + +impl AseControlOperation { + fn apply( + self, + peer_id: PeerId, + endpoint_map: HashMap<AseId, AudioStreamEndpoint>, + ) -> (Vec<crate::server::ServiceEvent>, AseControlOperationFut) { + let mut current_response_codes = Vec::new(); + let mut waiting = Vec::new(); + let mut events: Vec<crate::server::ServiceEvent> = Vec::new(); + let opcode: Option<AseControlPointOpcode> = (&self).try_into().ok(); + match self { + Self::ConfigCodec { codec_configurations, mut responses } => { + current_response_codes.append(&mut responses); + for codec_configuration in codec_configurations { + let ase_id = codec_configuration.ase_id; + let Some(endpoint) = endpoint_map.get(&ase_id) else { + current_response_codes + .push(ResponseCode::InvalidAseId { value: ase_id.into() }); + continue; + }; + if !opcode.unwrap().allowed_in_state(&endpoint.state) { + current_response_codes + .push(ResponseCode::InvalidAseStateMachineTransition { ase_id }); + continue; + } + let (send, recv) = oneshot::channel(); + waiting.push(recv); + events.push(codec_configuration.into_event(endpoint.clone(), send)); + } + } + _ => todo!(), + } + ( + events, + AseControlOperationFut { + peer_id, + opcode, + current_response_codes, + endpoints: Vec::new(), + waiting: waiting.into_iter().collect(), + }, + ) + } +} impl<T: bt_gatt::ServerTypes> Stream for AudioStreamControlServiceServer<T> { type Item = Result<ServiceEvent, Error>; fn poll_next( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<Self::Item>> { - let mut this = self.project(); loop { - let event = match futures::ready!(this.local_service.as_mut().poll_next(cx)) { + self.as_mut().poll_operations(cx); + let mut this = self.as_mut().project(); + if let Some(event) = this.outgoing_events.pop_front() { + return Poll::Ready(Some(Ok(event))); + } + let gatt_event = match futures::ready!(this.local_service.as_mut().poll_next(cx)) { None => return Poll::Ready(None), Some(Err(e)) => return Poll::Ready(Some(Err(e))), Some(Ok(event)) => event, }; use bt_gatt::server::ServiceEvent::*; - let peer_id = event.peer_id(); + let peer_id = gatt_event.peer_id(); let peer_entry = this .client_endpoints .entry(peer_id) .or_insert_with(|| this.default_client_endpoints.clone_for_peer(peer_id)); - match event { + match gatt_event { Read { handle, offset, responder, .. } => { let offset = offset as usize; if handle == CONTROL_POINT_HANDLE { @@ -338,21 +655,16 @@ continue; } responder.acknowledge(); - let _op = match AseControlOperation::try_from(value.to_owned()) { - Ok(op) => op, - Err(e) => { - let service_ref = this.local_service.as_ref(); - - let value = e.notify_value(); - service_ref.service().unwrap().notify( + match AseControlOperation::try_from(value.to_owned()) { + Ok(op) => self.as_mut().queue_operation(peer_id, op), + Err(response_code) => { + this.local_service.notify( &CONTROL_POINT_HANDLE, - &value[..], + &response_code.error_notify_value(), &[peer_id], ); - continue; } }; - // TODO: Do the operation here, possibly notifying things continue; } ClientConfiguration { peer_id, handle, notification_type } => {
diff --git a/rust/bt-ascs/src/tests.rs b/rust/bt-ascs/src/tests.rs index 3e88f87..7dd7a30 100644 --- a/rust/bt-ascs/src/tests.rs +++ b/rust/bt-ascs/src/tests.rs
@@ -68,19 +68,37 @@ assert!(poll_result.is_pending()); } +fn register_for_notification(fake_server: &mut FakeServer, peer_id: PeerId, handle: Handle) { + fake_server.incoming_client_configuration( + peer_id, + ASCS_SERVICE_ID, + handle, + bt_gatt::server::NotificationType::Notify, + ); +} + +// A published server with one sink and one source. fn published_server() -> ( Pin<Box<AudioStreamControlServiceServer<FakeTypes>>>, FakeServer, UnboundedReceiver<FakeServerEvent>, ) { let mut ascs_server = Box::pin(AudioStreamControlServiceServer::<FakeTypes>::new(1, 1)); - let (fake_server, events) = FakeServer::new(); + let (mut fake_server, mut events) = FakeServer::new(); let result = ascs_server.publish(&fake_server); assert!(result.is_ok()); assert!(ascs_server.poll_next_unpin(&mut futures_test::task::noop_context()).is_pending()); + assert!(matches!(expect_service_event(&mut events), FakeServerEvent::Published { .. })); + + for peer_id in [PeerId(1), PeerId(2)] { + for handle in [Handle(1), Handle(2), Handle(3)] { + register_for_notification(&mut fake_server, peer_id, handle); + } + } + (ascs_server, fake_server, events) } @@ -90,20 +108,17 @@ server.poll_next_unpin(&mut futures_test::task::noop_context()) } -// Ignored because we currently do nothing with operations. -#[ignore] #[test] fn peers_are_separated() { let (mut ascs_server, fake_server, mut server_events) = published_server(); // Read the sink uuid - fake_server.incoming_read(PeerId(1), ASCS_SERVICE_ID, Handle(2), 0); - + fake_server.incoming_read(PeerId(1), ASCS_SERVICE_ID, Handle(3), 0); // Poll the ascs server, should not result in an ASCS event assert!(poll_server(&mut ascs_server).is_pending()); // Should have the response - let peer_one_value; + let mut peer_one_value; match server_events.poll_next_unpin(&mut futures_test::task::noop_context()) { Poll::Ready(Some(FakeServerEvent::ReadResponded { service_id, handle: _, value })) => { assert_eq!(service_id, ASCS_SERVICE_ID); @@ -120,24 +135,174 @@ ASCS_SERVICE_ID, Handle(1), 0, - vec![0x01, ase_id, 0x01, 0x01, 0x06, 0x00], + vec![0x01, 0x01, ase_id, 0x01, 0x01, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00], ); - // Still shouldn't have any event + use crate::server::ServiceEvent; + use crate::types::*; + // Should have a codec configure event to respond to + match poll_server(&mut ascs_server) { + Poll::Ready(Some(Ok(ServiceEvent::CodecConfigure { responder, .. }))) => { + responder.accept( + Framing::Unframed, + vec![Phy::Le1MPhy], + 5, + std::time::Duration::from_millis(20).try_into().unwrap(), + PresentationDelayRange::build(0, 500).unwrap(), + ); + } + x => panic!("Expected a CodecConfigure, got {x:?}"), + }; + + // Expect the write to be responded to / acknowledged and a notification from + // the CP handle and the Source ASE + match expect_service_event(&mut server_events) { + FakeServerEvent::WriteResponded { value, .. } => assert!(value.is_ok()), + x => panic!("Expected acknowledge of write, got {x:?}"), + }; assert!(poll_server(&mut ascs_server).is_pending()); + match expect_service_event(&mut server_events) { + FakeServerEvent::Notified { handle, .. } => assert_eq!(Handle(1), handle), + x => panic!("Expected acknowledge of write, got {x:?}"), + }; + match expect_service_event(&mut server_events) { + FakeServerEvent::Notified { peers, handle, value, .. } => { + assert!(peers.contains(&PeerId(1))); + assert_eq!(Handle(3), handle); + // Should be the same ASE_ID + assert_eq!(value[0], ase_id); + assert_eq!(value[1], 0x01); // State is CodecConfigured + peer_one_value = value; + } + x => panic!("Expected acknowledge of write, got {x:?}"), + }; // Read the sink id from another peer - fake_server.incoming_read(PeerId(2), ASCS_SERVICE_ID, Handle(2), 0); + fake_server.incoming_read(PeerId(2), ASCS_SERVICE_ID, Handle(3), 0); + + assert!(poll_server(&mut ascs_server).is_pending()); + assert!(poll_server(&mut ascs_server).is_pending()); let peer_two_value; - match server_events.poll_next_unpin(&mut futures_test::task::noop_context()) { - Poll::Ready(Some(FakeServerEvent::ReadResponded { service_id, handle: _, value })) => { + match expect_service_event(&mut server_events) { + FakeServerEvent::ReadResponded { service_id, handle: _, value } => { assert_eq!(service_id, ASCS_SERVICE_ID); peer_two_value = value.unwrap(); } x => panic!("Expected the read to be responded to got {x:?}"), }; - let _ase_id = peer_two_value[0]; assert!(peer_one_value[1] != peer_two_value[1]); } + +#[test] +fn invalid_operation() { + let (mut ascs_server, fake_server, mut server_events) = published_server(); + + // Read the sink uuid + fake_server.incoming_read(PeerId(1), ASCS_SERVICE_ID, Handle(3), 0); + // Poll the ascs server, should not result in an ASCS event + assert!(poll_server(&mut ascs_server).is_pending()); + + // Should have the response + let sink_value; + match server_events.poll_next_unpin(&mut futures_test::task::noop_context()) { + Poll::Ready(Some(FakeServerEvent::ReadResponded { service_id, handle: _, value })) => { + assert_eq!(service_id, ASCS_SERVICE_ID); + sink_value = value.unwrap(); + } + x => panic!("Expected the read to be responded to got {x:?}"), + }; + + let ase_id = sink_value[0]; + + // Write an operation that is unknown. + fake_server.incoming_write( + PeerId(1), + ASCS_SERVICE_ID, + Handle(1), + 0, + vec![0x1f, 0x01, ase_id, 0xC0, 0xDE], + ); + + // Should have nothing to respond to + match poll_server(&mut ascs_server) { + Poll::Pending => {} + x => panic!("Expected to still be pending, got {x:?}"), + }; + + // Expect the write to be responded to / acknowledged and a notification from + // the CP handle and the Source ASE + match expect_service_event(&mut server_events) { + FakeServerEvent::WriteResponded { value, .. } => assert!(value.is_ok()), + x => panic!("Expected acknowledge of write, got {x:?}"), + }; + assert!(poll_server(&mut ascs_server).is_pending()); + match expect_service_event(&mut server_events) { + FakeServerEvent::Notified { handle, value, peers, .. } => { + assert!(peers.contains(&PeerId(1))); + assert_eq!(Handle(1), handle); + // Opcode should match + // Number_of_ASEs should be 0xFF (Table 4.7) + // ASE_ID is 0x00, and Reason should be 0x00 + assert_eq!(value, &[0x1f, 0xFF, 0x00, 0x01, 0x00]); + } + x => panic!("Expected acknowledge of write, got {x:?}"), + }; +} + +#[test] +fn invalid_length() { + let (mut ascs_server, fake_server, mut server_events) = published_server(); + + // Read the sink uuid + fake_server.incoming_read(PeerId(1), ASCS_SERVICE_ID, Handle(3), 0); + // Poll the ascs server, should not result in an ASCS event + assert!(poll_server(&mut ascs_server).is_pending()); + + // Should have the response + let sink_value; + match server_events.poll_next_unpin(&mut futures_test::task::noop_context()) { + Poll::Ready(Some(FakeServerEvent::ReadResponded { service_id, handle: _, value })) => { + assert_eq!(service_id, ASCS_SERVICE_ID); + sink_value = value.unwrap(); + } + x => panic!("Expected the read to be responded to got {x:?}"), + }; + + let ase_id = sink_value[0]; + + // Write an operation that is the wrong length (too short) + fake_server.incoming_write( + PeerId(1), + ASCS_SERVICE_ID, + Handle(1), + 0, + vec![0x01, 0x01, ase_id, 0xC0, 0xDE], + ); + + // Should have nothing to respond to + match poll_server(&mut ascs_server) { + Poll::Pending => {} + x => panic!("Expected to still be pending, got {x:?}"), + }; + + // Expect the write to be responded to / acknowledged and a notification from + // the CP handle and the Source ASE + match expect_service_event(&mut server_events) { + FakeServerEvent::WriteResponded { value, .. } => assert!(value.is_ok()), + x => panic!("Expected acknowledge of write, got {x:?}"), + }; + assert!(poll_server(&mut ascs_server).is_pending()); + match expect_service_event(&mut server_events) { + FakeServerEvent::Notified { handle, value, peers, .. } => { + assert!(peers.contains(&PeerId(1))); + assert_eq!(Handle(1), handle); + // Opcode should match + // Number_of_ASEs should be 0xFF (Table 4.7) + // ASE_ID is 0x00, and Reason should be 0x00 + assert_eq!(value, &[0x01, 0xFF, 0x00, 0x02, 0x00]); + } + x => panic!("Expected acknowledge of write, got {x:?}"), + }; +}
diff --git a/rust/bt-ascs/src/types.rs b/rust/bt-ascs/src/types.rs index ce5a579..0681283 100644 --- a/rust/bt-ascs/src/types.rs +++ b/rust/bt-ascs/src/types.rs
@@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use bt_common::packet_encoding::Decodable; +use bt_common::packet_encoding::{Decodable, Encodable}; use bt_common::{codable_as_bitmask, decodable_enum}; use thiserror::Error; @@ -28,8 +28,8 @@ #[derive(Debug, Clone, PartialEq)] pub enum ResponseCode { Success { ase_id: AseId }, - UnsupportedOpcode, - InvalidLength, + UnsupportedOpcode { opcode_byte: u8 }, + InvalidLength { opcode_byte: u8 }, InvalidAseId { value: u8 }, InvalidAseStateMachineTransition { ase_id: AseId }, InvalidAseDirection { ase_id: AseId }, @@ -44,8 +44,8 @@ fn to_code(&self) -> u8 { match self { ResponseCode::Success { .. } => 0x00, - ResponseCode::UnsupportedOpcode => 0x01, - ResponseCode::InvalidLength => 0x02, + ResponseCode::UnsupportedOpcode { .. } => 0x01, + ResponseCode::InvalidLength { .. } => 0x02, ResponseCode::InvalidAseId { .. } => 0x03, ResponseCode::InvalidAseStateMachineTransition { .. } => 0x04, ResponseCode::InvalidAseDirection { .. } => 0x05, @@ -65,6 +65,22 @@ } } + pub fn invalid_length() -> Self { + Self::invalid_length_opcode(0) + } + + pub fn invalid_length_opcode(opcode_byte: u8) -> Self { + Self::InvalidLength { opcode_byte } + } + + pub fn unsupported_opcode(opcode_byte: u8) -> Self { + Self::UnsupportedOpcode { opcode_byte } + } + + pub(crate) fn is_invalid_length(&self) -> bool { + matches!(self, Self::InvalidLength { .. }) + } + fn reason_byte(&self) -> u8 { match self { ResponseCode::ConfigurationParameterValue { reason, .. } => (*reason).into(), @@ -73,9 +89,13 @@ } } - fn ase_id_value(&self) -> u8 { + /// Get the ASE_ID value for this ResponseCode. This is included in the + /// control point notification in response to an operation. For + /// UnsupportedOpcode and InvaildLength, the ASE_ID is defined by the + /// spec to be 0x00. + pub(crate) fn ase_id_value(&self) -> u8 { match self { - ResponseCode::UnsupportedOpcode | ResponseCode::InvalidLength => 0x00, + ResponseCode::UnsupportedOpcode { .. } | ResponseCode::InvalidLength { .. } => 0x00, ResponseCode::InvalidAseId { value } => *value, ResponseCode::Success { ase_id } | ResponseCode::InvalidAseStateMachineTransition { ase_id } @@ -88,6 +108,15 @@ } } + pub(crate) fn error_notify_value(&self) -> Vec<u8> { + match self { + Self::UnsupportedOpcode { opcode_byte } | Self::InvalidLength { opcode_byte } => { + [*opcode_byte, 0xFFu8].into_iter().chain(self.notify_value()).collect() + } + _ => vec![], + } + } + pub(crate) fn notify_value(&self) -> Vec<u8> { [self.ase_id_value(), self.to_code(), self.reason_byte()].into() } @@ -162,14 +191,14 @@ fn decode(buf: &[u8]) -> (core::result::Result<Self, Self::Error>, usize) { if buf.len() < 1 { - return (Err(ResponseCode::InvalidLength), buf.len()); + return (Err(ResponseCode::invalid_length()), buf.len()); } (buf[0].try_into(), 1) } } decodable_enum! { - pub enum AseControlPointOpcode<u8, ResponseCode, UnsupportedOpcode> { + pub enum AseControlPointOpcode<u8, bt_common::packet_encoding::Error, OutOfRange> { ConfigCodec = 0x01, ConfigQos = 0x02, Enable = 0x03, @@ -181,14 +210,37 @@ } } +impl AseControlPointOpcode { + pub(crate) fn allowed_in_state(&self, state: &AseState) -> bool { + let allowed_states: &[AseState] = match self { + Self::ConfigCodec { .. } => { + &[AseState::Idle, AseState::CodecConfigured, AseState::QosConfigured] + } + Self::ConfigQos { .. } => &[AseState::CodecConfigured, AseState::QosConfigured], + Self::Enable { .. } => &[AseState::QosConfigured], + Self::ReceiverStartReady { .. } => &[AseState::Enabling], + Self::ReceiverStopReady { .. } => &[AseState::Disabling], + Self::Disable { .. } => &[AseState::Streaming], + Self::UpdateMetadata { .. } => &[AseState::Enabling, AseState::Streaming], + Self::Release { .. } => &[ + AseState::Enabling, + AseState::Streaming, + AseState::CodecConfigured, + AseState::QosConfigured, + AseState::Disabling, + ], + }; + allowed_states.contains(state) + } +} + /// ASE Control Operations. These can be initiated by a server or client. /// Defined in Table 4.6 of ASCS v1.0 -/// Marked non-exaustive as the remaining operations are RFU and new operations -/// could arrive but should be rejected if they are not recognized. +/// Unrecognized Operations result in Error Operations, which translate directly +/// into an Error response. /// Some variants already contain responses as decoding errors are detected, /// i.e. invalid parameters or metadata, which will be delivered after the /// operation is complete with the results from the rest of the operation. -#[non_exhaustive] #[derive(Debug, PartialEq, Clone)] pub enum AseControlOperation { ConfigCodec { codec_configurations: Vec<CodecConfiguration>, responses: Vec<ResponseCode> }, @@ -215,17 +267,17 @@ | Self::ConfigQos { responses, .. } | Self::Enable { responses, .. } | Self::UpdateMetadata { responses, .. } => { - responses.contains(&ResponseCode::InvalidLength) + responses.into_iter().find(|x| x.is_invalid_length()).is_some() } _ => false, } } } -impl TryFrom<AseControlOperation> for u8 { +impl TryFrom<&AseControlOperation> for u8 { type Error = Error; - fn try_from(value: AseControlOperation) -> Result<Self, Self::Error> { + fn try_from(value: &AseControlOperation) -> Result<Self, Self::Error> { match value { AseControlOperation::ConfigCodec { .. } => Ok(0x01), AseControlOperation::ConfigQos { .. } => Ok(0x02), @@ -252,17 +304,29 @@ (oks, errs) } +impl TryFrom<&AseControlOperation> for AseControlPointOpcode { + type Error = bt_common::packet_encoding::Error; + fn try_from(value: &AseControlOperation) -> Result<Self, Self::Error> { + u8::try_from(value).map_err(|_| Self::Error::OutOfRange)?.try_into() + } +} + impl TryFrom<Vec<u8>> for AseControlOperation { type Error = ResponseCode; fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> { if value.len() < Self::MIN_BYTE_SIZE { - return Err(ResponseCode::InvalidLength); + return Err(ResponseCode::invalid_length()); } - let operation: AseControlPointOpcode = value[0].try_into()?; + let operation: AseControlPointOpcode = match value[0].try_into() { + Ok(opcode) => opcode, + Err(_e) => { + return Err(ResponseCode::unsupported_opcode(value[0])); + } + }; let number_of_ases = value[1] as usize; if number_of_ases < 1 { - return Err(ResponseCode::InvalidLength); + return Err(ResponseCode::invalid_length_opcode(value[0])); } let (op, consumed) = match operation { AseControlPointOpcode::ConfigCodec => { @@ -286,19 +350,25 @@ AseControlPointOpcode::ReceiverStartReady => { // Only InvalidLength is possible let (results, consumed) = AseId::decode_multiple(&value[2..], Some(number_of_ases)); - let ases = results.into_iter().collect::<Result<Vec<_>, ResponseCode>>()?; + let Ok(ases) = results.into_iter().collect::<Result<Vec<_>, ResponseCode>>() else { + return Err(ResponseCode::invalid_length_opcode(value[0])); + }; (Self::ReceiverStartReady { ases }, consumed) } AseControlPointOpcode::Disable => { // Only InvalidLength is possible let (results, consumed) = AseId::decode_multiple(&value[2..], Some(number_of_ases)); - let ases = results.into_iter().collect::<Result<Vec<_>, ResponseCode>>()?; + let Ok(ases) = results.into_iter().collect::<Result<Vec<_>, ResponseCode>>() else { + return Err(ResponseCode::invalid_length_opcode(value[0])); + }; (Self::Disable { ases }, consumed) } AseControlPointOpcode::ReceiverStopReady => { // Only InvalidLength is possible let (results, consumed) = AseId::decode_multiple(&value[2..], Some(number_of_ases)); - let ases = results.into_iter().collect::<Result<Vec<_>, ResponseCode>>()?; + let Ok(ases) = results.into_iter().collect::<Result<Vec<_>, ResponseCode>>() else { + return Err(ResponseCode::invalid_length_opcode(value[0])); + }; (Self::ReceiverStopReady { ases }, consumed) } AseControlPointOpcode::UpdateMetadata => { @@ -310,7 +380,9 @@ AseControlPointOpcode::Release => { // Only InvalidLength is possible let (results, consumed) = AseId::decode_multiple(&value[2..], Some(number_of_ases)); - let ases = results.into_iter().collect::<Result<Vec<_>, ResponseCode>>()?; + let Ok(ases) = results.into_iter().collect::<Result<Vec<_>, ResponseCode>>() else { + return Err(ResponseCode::invalid_length_opcode(value[0])); + }; (Self::Release { ases }, consumed) } }; @@ -320,10 +392,10 @@ // the length of any variable length parameters for that operation as // defined in Section 5.1 through Section 5.8. if (consumed + 2) != value.len() { - return Err(ResponseCode::InvalidLength); + return Err(ResponseCode::invalid_length_opcode(value[0])); } if op.contains_invalid_length() { - return Err(ResponseCode::InvalidLength); + return Err(ResponseCode::invalid_length_opcode(value[0])); } Ok(op) } @@ -361,6 +433,16 @@ } } +impl From<TargetPhy> for Phy { + fn from(value: TargetPhy) -> Self { + match value { + TargetPhy::Le1MPhy => Self::Le1MPhy, + TargetPhy::Le2MPhy => Self::Le2MPhy, + TargetPhy::LeCodedPhy => Self::LeCodedPhy, + } + } +} + codable_as_bitmask!(Phy, u8); impl Phy { @@ -388,12 +470,12 @@ fn decode(buf: &[u8]) -> (core::result::Result<Self, Self::Error>, usize) { if buf.len() < Self::MIN_BYTE_SIZE { - return (Err(ResponseCode::InvalidLength), buf.len()); + return (Err(ResponseCode::invalid_length()), buf.len()); } let codec_specific_configuration_len = buf[Self::MIN_BYTE_SIZE - 1] as usize; let total_len = codec_specific_configuration_len + Self::MIN_BYTE_SIZE; if buf.len() < total_len { - return (Err(ResponseCode::InvalidLength), buf.len()); + return (Err(ResponseCode::invalid_length()), buf.len()); } let try_decode_fn = |buf: &[u8]| { let ase_id = AseId::try_from(buf[0])?; @@ -462,7 +544,7 @@ fn decode(buf: &[u8]) -> (core::result::Result<Self, Self::Error>, usize) { if buf.len() < QosConfiguration::BYTE_SIZE { - return (Err(ResponseCode::InvalidLength), buf.len()); + return (Err(ResponseCode::invalid_length()), buf.len()); } let try_decode_fn = |buf: &[u8]| { let ase_id = AseId::try_from(buf[0])?; @@ -476,7 +558,7 @@ sdu_interval = interval; } (Err(bt_common::packet_encoding::Error::BufferTooSmall), _) => { - return Err(ResponseCode::InvalidLength); + return Err(ResponseCode::invalid_length()); } (Err(bt_common::packet_encoding::Error::OutOfRange), _) => { return Err(ResponseCode::ConfigurationParameterValue { @@ -506,7 +588,7 @@ let max_transport_latency = MaxTransportLatency::decode(&buf[11..]).0.map_err(|e| match e { bt_common::packet_encoding::Error::BufferTooSmall => { - ResponseCode::InvalidLength + ResponseCode::invalid_length() } bt_common::packet_encoding::Error::OutOfRange => { ResponseCode::ConfigurationParameterValue { @@ -647,7 +729,7 @@ /// Max Transport Latency /// Valid range is [0x0005, 0x0FA0]. /// Transmitted in little-endian, Stored in native-endian. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] pub struct MaxTransportLatency(u16); impl Decodable for MaxTransportLatency { @@ -665,6 +747,20 @@ } } +impl Encodable for MaxTransportLatency { + type Error = bt_common::packet_encoding::Error; + fn encoded_len(&self) -> core::primitive::usize { + Self::BYTE_SIZE + } + fn encode(&self, buf: &mut [u8]) -> core::result::Result<(), Self::Error> { + if buf.len() < 2 { + return Err(Self::Error::BufferTooSmall); + } + [buf[0], buf[1]] = self.0.to_le_bytes(); + Ok(()) + } +} + impl TryFrom<std::time::Duration> for MaxTransportLatency { type Error = bt_common::packet_encoding::Error; @@ -686,7 +782,7 @@ /// Presentation delay parameter value being requested by the client for an ASE. /// This value is 24 bits long (0x00FFFFFF max) /// Transmitted in little-endian, Stored in native-endian. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct PresentationDelay { pub microseconds: u32, } @@ -700,13 +796,107 @@ fn decode(buf: &[u8]) -> (core::result::Result<Self, Self::Error>, usize) { if buf.len() < Self::BYTE_SIZE { - return (Err(ResponseCode::InvalidLength), buf.len()); + return (Err(ResponseCode::invalid_length()), buf.len()); } let microseconds = u32::from_le_bytes([buf[0], buf[1], buf[2], 0]); (Ok(PresentationDelay { microseconds }), Self::BYTE_SIZE) } } +impl Encodable for PresentationDelay { + type Error = bt_common::packet_encoding::Error; + fn encoded_len(&self) -> core::primitive::usize { + Self::BYTE_SIZE + } + fn encode(&self, buf: &mut [u8]) -> core::result::Result<(), Self::Error> { + if buf.len() < Self::BYTE_SIZE { + return Err(Self::Error::BufferTooSmall); + } + let encoded_le = self.microseconds.to_le_bytes(); + [buf[0], buf[1], buf[2]] = [encoded_le[0], encoded_le[1], encoded_le[2]]; + Ok(()) + } +} + +/// Presentation Delay Range. Used to indicate the supported range and +/// preferred range of the Presentation Delay parameter to be requested by the +/// ASCS Client. Prefered Minimum must be above min, and preferred_max must be +/// below max. Either of these being None indicates no preference. +#[derive(Debug, Clone)] +pub struct PresentationDelayRange { + min: PresentationDelay, + max: PresentationDelay, + preferred_min: Option<PresentationDelay>, + preferred_max: Option<PresentationDelay>, +} + +impl PresentationDelayRange { + const BYTE_SIZE: usize = PresentationDelay::BYTE_SIZE * 4; + /// Make a new delay range with no preference. Returns + /// ResponseCode::InvalidLength if min > max or the value is out of the + /// acceptable range (PresentationDelay is 24 bits) + pub fn build(min_us: u32, max_us: u32) -> Result<Self, ResponseCode> { + if min_us > max_us || min_us > 0x00FFFFFF || max_us > 0x00FFFFFF { + return Err(ResponseCode::invalid_length()); + } + Ok(Self { + min: PresentationDelay { microseconds: min_us }, + max: PresentationDelay { microseconds: max_us }, + preferred_min: None, + preferred_max: None, + }) + } + /// Set the preferred range. If the range does not fit into the supported + /// range, returns Err(ResponseCode::InvalidLength). + pub fn with_preferred( + &mut self, + min_us: Option<u32>, + max_us: Option<u32>, + ) -> Result<&mut Self, ResponseCode> { + self.preferred_min = match min_us { + Some(min_us) + if min_us < self.min.microseconds || max_us.is_some_and(|max| max < min_us) => + { + return Err(ResponseCode::invalid_length()); + } + Some(min_us) => Some(PresentationDelay { microseconds: min_us }), + None => None, + }; + self.preferred_max = match max_us { + Some(max_us) + if max_us > self.max.microseconds || min_us.is_some_and(|min| min > max_us) => + { + return Err(ResponseCode::invalid_length()); + } + Some(max_us) => Some(PresentationDelay { microseconds: max_us }), + None => None, + }; + Ok(self) + } +} + +impl Encodable for PresentationDelayRange { + type Error = bt_common::packet_encoding::Error; + fn encoded_len(&self) -> core::primitive::usize { + Self::BYTE_SIZE + } + fn encode(&self, buf: &mut [u8]) -> core::result::Result<(), Self::Error> { + if buf.len() < Self::BYTE_SIZE { + return Err(Self::Error::BufferTooSmall); + } + buf[0..Self::BYTE_SIZE].fill(0); + self.min.encode(&mut buf[0..3])?; + self.max.encode(&mut buf[3..6])?; + if let Some(pref_min) = &self.preferred_min { + pref_min.encode(&mut buf[6..9])?; + } + if let Some(pref_max) = &self.preferred_max { + pref_max.encode(&mut buf[9..12])?; + } + Ok(()) + } +} + #[derive(Debug, Clone, PartialEq)] pub struct AseIdWithMetadata { pub ase_id: AseId, @@ -722,7 +912,7 @@ fn decode(buf: &[u8]) -> (core::result::Result<Self, Self::Error>, usize) { if buf.len() < Self::MIN_BYTE_SIZE { - return (Err(ResponseCode::InvalidLength), buf.len()); + return (Err(ResponseCode::invalid_length()), buf.len()); } let ase_id = match AseId::try_from(buf[0]) { Ok(ase_id) => ase_id, @@ -731,20 +921,22 @@ let metadata_length = buf[1] as usize; let total_length = Self::MIN_BYTE_SIZE + metadata_length; if buf.len() < total_length { - return (Err(ResponseCode::InvalidLength), buf.len()); + return (Err(ResponseCode::invalid_length()), buf.len()); } use bt_common::core::ltv::Error as LtvError; use bt_common::core::ltv::LtValue; let (metadata_results, consumed) = Metadata::decode_all(&buf[2..2 + metadata_length]); if consumed != metadata_length { - return (Err(ResponseCode::InvalidLength), buf.len()); + return (Err(ResponseCode::invalid_length()), buf.len()); } let metadata_result: Result<Vec<Metadata>, LtvError<<Metadata as LtValue>::Type>> = metadata_results.into_iter().collect(); let Ok(metadata) = metadata_result else { match metadata_result.unwrap_err() { - LtvError::MissingType => return (Err(ResponseCode::InvalidLength), buf.len()), - LtvError::MissingData(_) => return (Err(ResponseCode::InvalidLength), buf.len()), + LtvError::MissingType => return (Err(ResponseCode::invalid_length()), buf.len()), + LtvError::MissingData(_) => { + return (Err(ResponseCode::invalid_length()), buf.len()); + } LtvError::UnrecognizedType(_, type_value) => { return ( Err(ResponseCode::Metadata { @@ -838,16 +1030,12 @@ let encoded_vec: Vec<u8> = encoded.into_iter().copied().collect(); let operation = AseControlOperation::try_from(encoded_vec); - let Ok(operation) = operation else { - panic!("Expected decode to work correctly, got {operation:?}"); - }; - assert_eq!( operation, - AseControlOperation::ConfigCodec { + Ok(AseControlOperation::ConfigCodec { codec_configurations: vec![codec_config], responses: Vec::new() - } + }) ); } @@ -867,12 +1055,8 @@ let encoded_vec: Vec<u8> = encoded.into_iter().copied().collect(); let operation = AseControlOperation::try_from(encoded_vec); - let Ok(operation) = operation else { - panic!("Expected decode to work correctly, got {operation:?}"); - }; - match operation { - AseControlOperation::ConfigCodec { codec_configurations, responses } => { + Ok(AseControlOperation::ConfigCodec { codec_configurations, responses }) => { assert_eq!(codec_configurations, vec![codec_config]); assert_eq!(responses.len(), 1); assert!(matches!(responses[0], ResponseCode::ConfigurationParameterValue { .. })); @@ -896,7 +1080,7 @@ let encoded_vec: Vec<u8> = encoded.into_iter().copied().collect(); let operation = AseControlOperation::try_from(encoded_vec); - assert_eq!(operation, Err(ResponseCode::InvalidLength)); + assert_eq!(operation, Err(ResponseCode::invalid_length_opcode(0x01))); } #[test] @@ -958,9 +1142,7 @@ assert_eq!(consumed, encoded_qos.len() - 2); let encoded_qos: Vec<u8> = encoded_qos.into_iter().copied().collect(); let operation = AseControlOperation::try_from(encoded_qos); - let Ok(_operation) = operation else { - panic!("Expected decode to work correctly, for {operation:?}"); - }; + assert!(operation.is_ok()); #[rustfmt::skip] let encoded_qos_one_fails = &[ @@ -996,4 +1178,46 @@ x => panic!("Expected ConfigQos to succeed, got {x:?}"), }; } + + #[test] + fn presentation_delay_range() { + // Min must be < than max, and within the right range + assert!(PresentationDelayRange::build(10, 1).is_err()); + assert!(PresentationDelayRange::build(1000, 0xC0FFEE00).is_err()); + assert!(PresentationDelayRange::build(0xC0FFEE00, 0xC0FFEE01).is_err()); + + let mut delay_range = PresentationDelayRange::build(1000, 2000).unwrap(); + + assert_eq!(delay_range.min.microseconds, 1000); + assert_eq!(delay_range.max.microseconds, 2000); + + assert!(delay_range.with_preferred(None, None).is_ok()); + assert_eq!(delay_range.preferred_min, None); + assert!(delay_range.with_preferred(Some(900), None).is_err()); + assert_eq!(delay_range.preferred_min, None); + assert!(delay_range.with_preferred(Some(1500), Some(1001)).is_err()); + assert_eq!(delay_range.preferred_min, None); + assert!(delay_range.with_preferred(Some(1500), None).is_ok()); + assert_eq!(delay_range.preferred_min, Some(PresentationDelay { microseconds: 1500 })); + assert_eq!(delay_range.preferred_max, None); + + assert!(delay_range.with_preferred(None, Some(2100)).is_err()); + assert!(delay_range.with_preferred(Some(1801), Some(1800)).is_err()); + assert!(delay_range.with_preferred(Some(1500), Some(1800)).is_ok()); + assert_eq!(delay_range.preferred_min, Some(PresentationDelay { microseconds: 1500 })); + assert_eq!(delay_range.preferred_max, Some(PresentationDelay { microseconds: 1800 })); + + let mut encoded = [0; PresentationDelayRange::BYTE_SIZE]; + assert!(delay_range.encode(&mut encoded).is_ok()); + + #[rustfmt::skip] + let expected: &[u8; 12] = &[ + 0xE8, 0x03, 0x00, // 1000 + 0xD0, 0x07, 0x00, // 2000 + 0xDC, 0x05, 0x00, // 1500 + 0x08, 0x07, 0x00, // 1800 + ]; + + assert_eq!(&encoded, expected); + } }
diff --git a/rust/bt-common/src/core.rs b/rust/bt-common/src/core.rs index 9fab7e3..9549747 100644 --- a/rust/bt-common/src/core.rs +++ b/rust/bt-common/src/core.rs
@@ -190,8 +190,8 @@ } } -impl Encodable for CodecId { - type Error = PacketError; +impl crate::packet_encoding::Encodable for CodecId { + type Error = crate::packet_encoding::Error; fn encoded_len(&self) -> core::primitive::usize { Self::BYTE_SIZE