rust/bt-{broadcast-assistant,bass}: Fix bugs from UPF
- Eagerly process scan results for Broadcast Sources in Broadcast
Assistant EventStream so that all available advertisements are processed
- Eagerly process characteristic notifications from BASS server and
improve robustness of BASS EventStream
- Fix passcode encoding to match spec
- Improve debug/test experience by printing Peer ID and Broadcast ID in
hex
- Remove unnecessary Option around BIG-BIS sync map
Test: cargo test
Fixes: b/443062360, b/443063033, b/443062386
Change-Id: I80b284eb90b034fefb6d5cdb160a2086b46be532
Reviewed-on: https://bluetooth-review.googlesource.com/c/bluetooth/+/2800
diff --git a/rust/bt-bap/src/types.rs b/rust/bt-bap/src/types.rs
index 65bb251..f3946d6 100644
--- a/rust/bt-bap/src/types.rs
+++ b/rust/bt-bap/src/types.rs
@@ -10,7 +10,7 @@
/// Broadcast_ID is a 3-byte data on the wire.
/// Defined in BAP spec v1.0.1 section 3.7.2.1.
-#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
+#[derive(Clone, Copy, Eq, Hash, PartialEq)]
pub struct BroadcastId(u32);
impl BroadcastId {
@@ -24,7 +24,13 @@
impl std::fmt::Display for BroadcastId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "{:#08x}", self.0)
+ write!(f, "{:08x}", self.0)
+ }
+}
+
+impl std::fmt::Debug for BroadcastId {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_tuple("BroadcastId").field(&format_args!("0x{}", self)).finish()
}
}
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
index 8ad2ef6..fc86482 100644
--- a/rust/bt-bass/src/client.rs
+++ b/rust/bt-bass/src/client.rs
@@ -296,8 +296,8 @@
/// in
/// * `pa_interval` - updated PA interval value. If none, unknown value is
/// used
- /// * `bis_sync` - desired BIG to BIS synchronization information. If empty,
- /// it's not updated
+ /// * `bis_map` - desired BIG to BIS synchronization update information. If
+ /// a BIG does not exist as a key, sync for that BIG is not updated.
/// * `metadata_map` - map of updated metadata for BIGs. If a mapping does
/// not exist for a BIG, that BIG's metadata is not updated
pub async fn modify_broadcast_source(
@@ -305,7 +305,7 @@
broadcast_id: BroadcastId,
pa_sync: PaSync,
pa_interval: Option<PeriodicAdvertisingInterval>,
- bis_sync: Option<HashMap<SubgroupIndex, BisSync>>,
+ bis_map: HashMap<SubgroupIndex, BisSync>,
metadata_map: Option<HashMap<SubgroupIndex, Vec<Metadata>>>,
) -> Result<(), Error> {
let op = {
@@ -314,13 +314,12 @@
.ok_or(Error::UnknownBroadcastSource(broadcast_id))?;
// Update BIS_Sync param for BIGs if applicable.
- if let Some(sync_map) = bis_sync {
- for (big_index, group) in state.subgroups.iter_mut().enumerate() {
- if let Some(bis_sync) = sync_map.get(&(big_index as u8)) {
- group.bis_sync = bis_sync.clone();
- }
+ for (big_index, group) in state.subgroups.iter_mut().enumerate() {
+ if let Some(bis_sync) = bis_map.get(&(big_index as u8)) {
+ group.bis_sync = bis_sync.clone();
}
}
+
// Update metadata for BIGs if applicable.
if let Some(mut m) = metadata_map {
for (big_index, group) in state.subgroups.iter_mut().enumerate() {
@@ -779,7 +778,7 @@
BroadcastId::try_from(0x11).unwrap(),
PaSync::DoNotSync,
Some(PeriodicAdvertisingInterval(0xAAAA)),
- None,
+ HashMap::new(),
None,
);
pin_mut!(op_fut);
@@ -827,7 +826,7 @@
BroadcastId::try_from(0x11).unwrap(),
PaSync::DoNotSync,
None,
- Some(HashMap::from([(0, BisSync::sync(vec![1, 3, 5]).unwrap())])),
+ HashMap::from([(0, BisSync::sync(vec![1, 3, 5]).unwrap())]),
Some(HashMap::from([
(0, vec![Metadata::BroadcastAudioImmediateRenderingFlag]),
(1, vec![Metadata::Language("eng".to_string())]),
@@ -835,7 +834,7 @@
])),
);
pin_mut!(op_fut);
- let polled = op_fut.poll_unpin(&mut noop_cx);
+ let polled: Poll<Result<(), Error>> = op_fut.poll_unpin(&mut noop_cx);
assert_matches!(polled, Poll::Ready(Ok(_)));
}
@@ -849,7 +848,7 @@
BroadcastId::try_from(0x11).unwrap(),
PaSync::DoNotSync,
None,
- None,
+ HashMap::new(),
None,
);
pin_mut!(op_fut);
diff --git a/rust/bt-bass/src/client/event.rs b/rust/bt-bass/src/client/event.rs
index bf4349e..f7ad10b 100644
--- a/rust/bt-bass/src/client/event.rs
+++ b/rust/bt-bass/src/client/event.rs
@@ -122,91 +122,75 @@
loop {
if let Some(item) = self.event_queue.pop_front() {
- match item {
- Ok(event) => return Poll::Ready(Some(Ok(event))),
- Err(e) => {
- // If an error was received, we terminate the event stream, but send an
- // error to indicate why it was terminated.
- self.terminated = true;
- return Poll::Ready(Some(Err(e)));
+ // An error from a single stream will be reported, but the main EventStream
+ // will continue for other sources.
+ return Poll::Ready(Some(item));
+ }
+
+ let Some(item) = futures::ready!(self.notification_streams.poll_next_unpin(cx)) else {
+ // All notification streams have been closed. Terminate the EventStream.
+ self.terminated = true;
+ let err = Error::EventStream(Box::new(Error::Service(
+ ServiceError::NotificationChannelClosed(format!(
+ "All BASS GATT notification streams closed"
+ )),
+ )));
+ return Poll::Ready(Some(Err(err)));
+ };
+
+ // One of the notification streams produced an error. Report it, but do not
+ // terminate. SelectAll will remove the faulty stream from its set.
+ let Ok(notification) = item else {
+ let err = Error::EventStream(Box::new(Error::Gatt(item.unwrap_err())));
+ return Poll::Ready(Some(Err(err)));
+ };
+
+ let char_handle = notification.handle;
+ let (Ok(new_state), _) = BroadcastReceiveState::decode(notification.value.as_slice())
+ else {
+ self.event_queue.push_back(Ok(Event::UnknownPacket));
+ continue;
+ };
+
+ let maybe_prev_state = {
+ let mut lock = self.broadcast_sources.lock();
+ lock.update_state(char_handle, new_state.clone())
+ };
+
+ // If the previous value was not empty, check if it was overwritten.
+ if let Some(ref prev_state) = maybe_prev_state {
+ if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
+ if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state) {
+ self.event_queue.push_back(Ok(Event::RemovedBroadcastSource(
+ prev_receive_state.broadcast_id,
+ )));
}
}
}
- match self.notification_streams.poll_next_unpin(cx) {
- Poll::Pending => return Poll::Pending,
- Poll::Ready(None) => {
- let err = Error::EventStream(Box::new(Error::Service(
- ServiceError::NotificationChannelClosed(format!(
- "GATT notification stream for BRS characteristics closed"
- )),
+ // BRS characteristic value was updated with a new broadcast source
+ // information.
+ if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
+ let is_new_source = match maybe_prev_state {
+ Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
+ None => true,
+ };
+ if is_new_source {
+ self.event_queue.push_back(Ok(Event::AddedBroadcastSource(
+ receive_state.broadcast_id,
+ receive_state.pa_sync_state,
+ receive_state.big_encryption,
)));
- self.event_queue.push_back(Err(err));
- }
- Poll::Ready(Some(Err(error))) => {
- // Deem all errors as critical.
- let err = Error::EventStream(Box::new(Error::Gatt(error)));
- self.event_queue.push_back(Err(err));
- }
- Poll::Ready(Some(Ok(notification))) => {
- let char_handle = notification.handle;
- let (Ok(new_state), _) =
- BroadcastReceiveState::decode(notification.value.as_slice())
- else {
- self.event_queue.push_back(Ok(Event::UnknownPacket));
- continue;
- };
-
- let maybe_prev_state = {
- let mut lock = self.broadcast_sources.lock();
- lock.update_state(char_handle, new_state.clone())
- };
-
- let mut multi_events = VecDeque::new();
-
- // If the previous value was not empty, check if it was overwritten.
- if let Some(ref prev_state) = maybe_prev_state {
- if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
- if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state)
- {
- multi_events.push_back(Ok(Event::RemovedBroadcastSource(
- prev_receive_state.broadcast_id,
- )));
- }
- }
+ } else {
+ let other_events = Event::from_broadcast_receive_state(&receive_state);
+ for e in other_events.into_iter() {
+ self.event_queue.push_back(Ok(e));
}
-
- // BRS characteristic value was updated with a new broadcast source
- // information.
- if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
- let is_new_source = match maybe_prev_state {
- Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
- None => true,
- };
- if is_new_source {
- multi_events.push_back(Ok(Event::AddedBroadcastSource(
- receive_state.broadcast_id,
- receive_state.pa_sync_state,
- receive_state.big_encryption,
- )));
- } else {
- let other_events = Event::from_broadcast_receive_state(&receive_state);
- for e in other_events.into_iter() {
- multi_events.push_back(Ok(e));
- }
- }
- }
- if multi_events.len() != 0 {
- self.event_queue.append(&mut multi_events);
- continue;
- }
- continue;
}
- };
-
- break;
+ }
+ // Continue to the top of the loop to start draining the event_queue.
+ continue;
}
- Poll::Pending
}
}
@@ -220,7 +204,7 @@
use futures::channel::mpsc::unbounded;
use bt_common::core::AddressType;
- use bt_gatt::types::Handle;
+ use bt_gatt::types::{Error as BtGattError, GattError, Handle};
#[test]
fn poll_event_stream() {
@@ -371,4 +355,57 @@
// Should be pending because no more events generated from notifications.
assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
}
+
+ #[test]
+ fn error_on_one_stream_does_not_terminate_event_stream() {
+ let mut streams = SelectAll::new();
+ let (sender1, receiver1) = unbounded();
+ let (sender2, receiver2) = unbounded();
+ streams.push(receiver1.boxed());
+ streams.push(receiver2.boxed());
+
+ let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
+ (Handle(0x1), BroadcastReceiveState::Empty),
+ (Handle(0x2), BroadcastReceiveState::Empty),
+ ]))));
+ let mut event_streams = EventStream::new(streams, source_tracker);
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+ // Send an error on one stream.
+ sender1.unbounded_send(Err(BtGattError::Gatt(GattError::InvalidPdu))).expect("should send");
+
+ // We should receive the error event.
+ let polled = event_streams.poll_next_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Some(Err(Error::EventStream(_)))));
+
+ // The stream should NOT be terminated.
+ assert!(!event_streams.is_terminated());
+ assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
+
+ // Send a valid notification on the other stream.
+ #[rustfmt::skip]
+ sender2
+ .unbounded_send(Ok(CharacteristicNotification {
+ handle: Handle(0x2),
+ value: vec![
+ 0x02, AddressType::Public as u8, // source id and address type
+ 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, // address
+ 0x01, 0x02, 0x03, 0x04, // ad set id and broadcast id
+ PaSyncState::Synced as u8,
+ EncryptionStatus::NotEncrypted.raw_value(),
+ 0x00, // no subgroups
+ ],
+ maybe_truncated: false,
+ }))
+ .expect("should send");
+
+ // We should be able to receive the event from the second stream.
+ let polled = event_streams.poll_next_unpin(&mut noop_cx);
+ assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
+ assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
+ });
+
+ // The stream should still not be terminated.
+ assert!(!event_streams.is_terminated());
+ }
}
diff --git a/rust/bt-broadcast-assistant/src/assistant/event.rs b/rust/bt-broadcast-assistant/src/assistant/event.rs
index d9a535a..18a97ae 100644
--- a/rust/bt-broadcast-assistant/src/assistant/event.rs
+++ b/rust/bt-broadcast-assistant/src/assistant/event.rs
@@ -106,40 +106,37 @@
// Poll scan result stream to check if there were any newly discovered peers
// that we're interested in.
- match futures::ready!(self.scan_result_stream.poll_next_unpin(cx)) {
- Some(Ok(scanned)) => {
- match Self::try_into_broadcast_source(&scanned) {
- Err(e) => {
- return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData {
- peer: scanned.id,
- error: e,
- })));
- }
- Ok(Some(found_source)) => {
- // If we found a broadcast source, we add its information in the
- // internal records.
- let (broadcast_source, changed) = self
- .broadcast_sources
- .merge_broadcast_source_data(&scanned.id, &found_source);
-
- // Broadcast found event is relayed to the client iff complete
- // information has been gathered.
- if broadcast_source.into_add_source() && changed {
- return Poll::Ready(Some(Ok(Event::FoundBroadcastSource {
- peer: scanned.id,
- source: broadcast_source,
- })));
- }
-
- Poll::Pending
- }
- Ok(None) => Poll::Pending,
- }
- }
- None | Some(Err(_)) => {
+ loop {
+ let Some(Ok(scanned)) = futures::ready!(self.scan_result_stream.poll_next_unpin(cx))
+ else {
self.terminated = true;
self.broadcast_source_scan_started.store(false, Ordering::Relaxed);
- Poll::Ready(Some(Err(Error::CentralScanTerminated)))
+ return Poll::Ready(Some(Err(Error::CentralScanTerminated)));
+ };
+
+ let found_source = match Self::try_into_broadcast_source(&scanned) {
+ Err(error) => {
+ return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData {
+ peer: scanned.id,
+ error,
+ })));
+ }
+ Ok(None) => continue,
+ Ok(Some(source)) => source,
+ };
+
+ // If we found a broadcast source, we add its information in the
+ // internal records.
+ let (broadcast_source, changed) =
+ self.broadcast_sources.merge_broadcast_source_data(&scanned.id, &found_source);
+
+ // Broadcast found event is relayed to the client iff complete
+ // information has been gathered.
+ if broadcast_source.into_add_source() && changed {
+ return Poll::Ready(Some(Ok(Event::FoundBroadcastSource {
+ peer: scanned.id,
+ source: broadcast_source,
+ })));
}
}
}
@@ -153,33 +150,34 @@
use bt_common::core::{AddressType, AdvertisingSetId};
use bt_gatt::central::{AdvertisingDatum, PeerName};
- use bt_gatt::test_utils::{FakeTypes, ScannedResultStream};
+ use bt_gatt::test_utils::{FakeTypes, ScannedResultStream, ScannedResultStreamController};
use bt_gatt::types::Error as BtGattError;
use bt_gatt::types::GattError;
- fn setup_stream() -> (EventStream<FakeTypes>, ScannedResultStream) {
+ fn setup_stream() -> (EventStream<FakeTypes>, ScannedResultStreamController) {
let fake_scan_result_stream = ScannedResultStream::new();
+ let controller = fake_scan_result_stream.controller();
let broadcast_sources = DiscoveredBroadcastSources::new();
let broadcast_source_scan_started = Arc::new(AtomicBool::new(false));
(
EventStream::<FakeTypes>::new(
- fake_scan_result_stream.clone(),
+ fake_scan_result_stream,
broadcast_sources,
broadcast_source_scan_started,
),
- fake_scan_result_stream,
+ controller,
)
}
#[test]
fn poll_found_broadcast_source_events() {
- let (mut stream, mut scan_result_stream) = setup_stream();
+ let (mut stream, scan_result_controller) = setup_stream();
// Scanned a broadcast source and its broadcast id.
let broadcast_source_pid = PeerId(1005);
- scan_result_stream.set_scanned_result(Ok(ScanResult {
+ scan_result_controller.add_scanned_result(Ok(ScanResult {
id: broadcast_source_pid,
connectable: true,
name: PeerName::Unknown,
@@ -225,7 +223,7 @@
* (big #2 / bis #2) */
];
- scan_result_stream.set_scanned_result(Ok(ScanResult {
+ scan_result_controller.add_scanned_result(Ok(ScanResult {
id: broadcast_source_pid,
connectable: true,
name: PeerName::Unknown,
@@ -251,7 +249,7 @@
assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
// Scanned the same broadcast source's BASE data.
- scan_result_stream.set_scanned_result(Ok(ScanResult {
+ scan_result_controller.add_scanned_result(Ok(ScanResult {
id: broadcast_source_pid,
connectable: true,
name: PeerName::Unknown,
@@ -270,10 +268,10 @@
#[test]
fn central_scan_stream_terminates() {
- let (mut stream, mut scan_result_stream) = setup_stream();
+ let (mut stream, scan_result_controller) = setup_stream();
// Mimick scan error.
- scan_result_stream.set_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu)));
+ scan_result_controller.add_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu)));
let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
match stream.poll_next_unpin(&mut noop_cx) {
@@ -285,4 +283,68 @@
assert_matches!(stream.poll_next_unpin(&mut noop_cx), Poll::Ready(None));
assert_matches!(stream.is_terminated(), true);
}
+
+ #[test]
+ fn poll_processes_multiple_results_eagerly() {
+ let (mut stream, scan_result_controller) = setup_stream();
+ let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+ let broadcast_source_pid = PeerId(1005);
+
+ #[rustfmt::skip]
+ let base_data = vec![
+ 0x10, 0x20, 0x30, 0x01, // presentation delay, num of subgroups
+ 0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id
+ 0x00, // codec specific config len
+ 0x00, // metadata len,
+ 0x01, 0x00, // bis index, codec specific config len
+ ];
+
+ // 1. An irrelevant peer that should be ignored.
+ scan_result_controller.add_scanned_result(Ok(ScanResult {
+ id: PeerId(1),
+ connectable: true,
+ name: PeerName::Unknown,
+ advertised: vec![],
+ advertising_sid: Some(0),
+ periodic_advertising_interval: None,
+ }));
+ // 2. A broadcast source, but incomplete data (only broadcast ID).
+ scan_result_controller.add_scanned_result(Ok(ScanResult {
+ id: broadcast_source_pid,
+ connectable: true,
+ name: PeerName::Unknown,
+ advertised: vec![AdvertisingDatum::ServiceData(
+ BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE,
+ vec![0x01, 0x02, 0x03],
+ )],
+ advertising_sid: Some(1),
+ periodic_advertising_interval: None,
+ }));
+ // 3. The same broadcast source, with BASE data, which completes it.
+ scan_result_controller.add_scanned_result(Ok(ScanResult {
+ id: broadcast_source_pid,
+ connectable: true,
+ name: PeerName::Unknown,
+ advertised: vec![AdvertisingDatum::ServiceData(
+ BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
+ base_data.clone(),
+ )],
+ advertising_sid: Some(1),
+ periodic_advertising_interval: None,
+ }));
+
+ // The stream should eagerly process all three items and emit the
+ // FoundBroadcastSource event.
+ let poll_result = stream.poll_next_unpin(&mut noop_cx);
+ let Poll::Ready(Some(Ok(event))) = poll_result else {
+ panic!("should have received event, but got {:?}", poll_result);
+ };
+ assert_matches!(event, Event::FoundBroadcastSource{peer, ..} => {
+ assert_eq!(peer, broadcast_source_pid);
+ });
+
+ // The underlying stream is now empty, so the next poll should be pending.
+ assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+ }
}
diff --git a/rust/bt-broadcast-assistant/src/assistant/peer.rs b/rust/bt-broadcast-assistant/src/assistant/peer.rs
index 30be3f1..a901b21 100644
--- a/rust/bt-broadcast-assistant/src/assistant/peer.rs
+++ b/rust/bt-broadcast-assistant/src/assistant/peer.rs
@@ -160,7 +160,7 @@
.unwrap_or(None);
self.bass
- .modify_broadcast_source(broadcast_id, pa_sync, pa_interval, Some(bis_sync), None)
+ .modify_broadcast_source(broadcast_id, pa_sync, pa_interval, bis_sync, None)
.await
.map_err(Into::into)
}
diff --git a/rust/bt-broadcast-assistant/src/debug.rs b/rust/bt-broadcast-assistant/src/debug.rs
index 1fe4f9e..d8d26d8 100644
--- a/rust/bt-broadcast-assistant/src/debug.rs
+++ b/rust/bt-broadcast-assistant/src/debug.rs
@@ -18,7 +18,7 @@
use bt_common::generic_audio::metadata_ltv::Metadata;
use bt_common::PeerId;
use bt_gatt::pii::GetPeerAddr;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use futures::stream::FusedStream;
use futures::Future;
@@ -103,7 +103,7 @@
return;
};
if let Err(e) = f(peer).await {
- eprintln!("failed to perform oepration: {e:?}");
+ eprintln!("failed to perform operation: {e:?}");
}
}
}
@@ -164,18 +164,45 @@
eprintln!("Failed to parse big index from '{}', ignoring.", parts[0]);
continue;
};
- let Ok(bis_index) = parse_int::<u8>(parts[1]) else {
- eprintln!("Failed to parse bis index from '{}', ignoring.", parts[1]);
- continue;
- };
- let entry = map.entry(ith_big).or_insert(BisSync::no_sync());
- if let Err(e) = entry.synchronize_to_index(bis_index) {
- eprintln!("Invalid BIS index: {e:?}");
+ match parse_int::<u8>(parts[1]) {
+ Ok(bis_index) => {
+ let entry = map.entry(ith_big).or_insert(BisSync::no_sync());
+ if let Err(e) = entry.synchronize_to_index(bis_index) {
+ eprintln!("Failed to set sync to BIS index: {e:?}");
+ }
+ }
+ Err(_) if parts[1] == "OFF" => {
+ map.insert(ith_big, BisSync::no_sync());
+ }
+ Err(e) => {
+ eprintln!("{e:?} - BIS index should be a number from 1-31, ignoring {}", parts[1]);
+ }
}
}
map
}
+/// Converts a passcode string into a 16-byte broadcast code.
+/// The string is UTF-8 encoded and then padded with zeros on the right to a
+/// total length of 16 bytes. This result is a little-endian byte array
+/// equivalent to a 128-bit value.
+fn passcode_to_broadcast_code(passcode: &str) -> Result<[u8; 16], String> {
+ if passcode.is_empty() {
+ return Err("invalid broadcast code: passcode cannot be empty".to_string());
+ }
+ let code = passcode.as_bytes();
+ if code.len() > 16 {
+ return Err(format!(
+ "invalid broadcast code: '{}'. should be at max length 16, but was {}",
+ passcode,
+ code.len()
+ ));
+ }
+ let mut broadcast_code = [0u8; 16];
+ broadcast_code[..code.len()].copy_from_slice(code);
+ Ok(broadcast_code)
+}
+
impl<T: bt_gatt::GattTypes + 'static, R: GetPeerAddr> CommandRunner for AssistantDebug<T, R>
where
<T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
@@ -187,13 +214,18 @@
cmd: Self::Set,
args: Vec<String>,
) -> impl futures::Future<Output = Result<(), impl std::error::Error>> {
+ let help_subcommands: HashSet<&str> = HashSet::from(["help", "-h", "--help"]);
async move {
+ if args.len() >= 1 && help_subcommands.contains(args[0].as_str()) {
+ eprintln!("usage: {}", cmd.help_simple());
+ return Ok(());
+ }
match cmd {
AssistantCmd::Info => {
let known = self.assistant.known_broadcast_sources();
- eprintln!("Known Broadcast Sources:");
+ println!("Known Broadcast Sources:");
for (id, s) in known {
- eprintln!("PeerId ({id}), source: {s:?}");
+ println!("PeerId ({id}), source: {s:?}");
}
}
AssistantCmd::Connect => {
@@ -240,19 +272,16 @@
return Ok(());
};
- let code = args[1].as_bytes();
- if code.len() > 16 {
- eprintln!(
- "invalid broadcast code: {}. should be at max length 16",
- args[1]
- );
- return Ok(());
- }
- let mut passcode_vec = vec![0; 16];
- passcode_vec[16 - code.len()..16].copy_from_slice(code);
+ let broadcast_code = match passcode_to_broadcast_code(&args[1]) {
+ Ok(code) => code,
+ Err(e) => {
+ eprintln!("{e:?}");
+ return Ok(());
+ }
+ };
+
self.with_peer(|peer| async move {
- peer.send_broadcast_code(broadcast_id, passcode_vec.try_into().unwrap())
- .await
+ peer.send_broadcast_code(broadcast_id, broadcast_code).await
})
.await;
}
@@ -383,7 +412,7 @@
advertising_sid,
) {
Ok(source) => {
- eprintln!("broadcast source after additional info: {source:?}")
+ println!("broadcast source after additional info: {source:?}")
}
Err(e) => {
eprintln!("failed to enter in broadcast source information: {e:?}")
@@ -401,7 +430,7 @@
}
let Ok(peer_id) = parse_peer_id(&args[0]) else {
- println!("invalid peer id: {}", args[0]);
+ eprintln!("invalid peer id: {}", args[0]);
return Ok(());
};
@@ -432,7 +461,7 @@
.assistant
.force_discover_broadcast_source_metadata(peer_id, all_big_metadata)
{
- Ok(source) => eprintln!("broadcast source with metadata: {source:?}"),
+ Ok(source) => println!("broadcast source with metadata: {source:?}"),
Err(e) => eprintln!("failed to enter in broadcast source metadata: {e:?}"),
}
}
@@ -465,7 +494,7 @@
.assistant
.force_discover_broadcast_source_metadata(peer_id, all_big_metadata)
{
- Ok(source) => eprintln!("broadcast source with metadata: {source:?}"),
+ Ok(source) => println!("broadcast source with metadata: {source:?}"),
Err(e) => {
eprintln!("failed to enter in empty broadcast source metadata: {e:?}")
}
@@ -524,11 +553,23 @@
#[test]
fn test_parse_bis_sync() {
+ // Basic case with multiple BIGs and BIS indices.
let bis_sync = parse_bis_sync("0-1,0-2,1-1");
assert_eq!(bis_sync.len(), 2);
assert_eq!(bis_sync.get(&0), Some(&BisSync::sync(vec![1, 2]).unwrap()));
assert_eq!(bis_sync.get(&1), Some(&BisSync::sync(vec![1]).unwrap()));
+ // Case with "OFF" to disable sync for a BIG.
+ let bis_sync = parse_bis_sync("0-1,1-OFF,0-2");
+ assert_eq!(bis_sync.len(), 2);
+ assert_eq!(bis_sync.get(&0), Some(&BisSync::sync(vec![1, 2]).unwrap()));
+ assert_eq!(bis_sync.get(&1), Some(&BisSync::no_sync()));
+
+ // Case where sync is set and then turned off for the same BIG.
+ let bis_sync = parse_bis_sync("0-5,0-OFF");
+ assert_eq!(bis_sync.len(), 1);
+ assert_eq!(bis_sync.get(&0), Some(&BisSync::no_sync()));
+
// Will ignore invalid values.
let bis_sync = parse_bis_sync("0-1,0-2,1:1,1-1-1,");
assert_eq!(bis_sync.len(), 1);
@@ -537,4 +578,34 @@
let bis_sync = parse_bis_sync("hellothisistoallynotvalid");
assert_eq!(bis_sync.len(), 0);
}
+
+ #[test]
+ fn test_passcode_to_broadcast_code() {
+ // UTF-8 string that is less than 16 bytes.
+ // Source of truth test case from Bluetooth Spec.
+ let code = "Børne House";
+ let expected = [
+ 0x42, 0xc3, 0xb8, 0x72, 0x6e, 0x65, 0x20, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x00, 0x00,
+ 0x00, 0x00,
+ ];
+ let actual = passcode_to_broadcast_code(code).expect("should succeed");
+ assert_eq!(actual, expected);
+ assert_eq!(u128::from_le_bytes(actual), 0x00000000_6573756F_4820656E_72B8C342);
+
+ // Valid ASCII passcode, exactly 16 bytes.
+ let code = "1234567890123456";
+ let expected = [
+ 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, 0x31, 0x32, 0x33, 0x34,
+ 0x35, 0x36,
+ ];
+ assert_eq!(passcode_to_broadcast_code(code).unwrap(), expected);
+
+ // Invalid passcode, over 16 bytes.
+ let code = "12345678901234567";
+ assert!(passcode_to_broadcast_code(code).is_err());
+
+ // Empty passcode should be an error.
+ let code = "";
+ assert!(passcode_to_broadcast_code(code).is_err());
+ }
}
diff --git a/rust/bt-common/src/lib.rs b/rust/bt-common/src/lib.rs
index 382567d..1cb2cf0 100644
--- a/rust/bt-common/src/lib.rs
+++ b/rust/bt-common/src/lib.rs
@@ -13,7 +13,7 @@
/// same peer as long as the PeerId was retrieved after the `Central` was
/// instantiated. PeerIds can be valid longer than that (often if the peer is
/// bonded)
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct PeerId(pub u64);
impl rust_core::fmt::Display for PeerId {
@@ -21,7 +21,13 @@
&self,
f: &mut rust_core::fmt::Formatter<'_>,
) -> std::result::Result<(), std::fmt::Error> {
- write!(f, "{:x}", self.0)
+ write!(f, "{:016x}", self.0)
+ }
+}
+
+impl std::fmt::Debug for PeerId {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_tuple("PeerId").field(&format_args!("0x{}", self)).finish()
}
}
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index d528b47..3397799 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -5,16 +5,17 @@
use bt_common::core::{Address, AddressType};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::future::{ready, Ready};
-use futures::Stream;
+use futures::stream::{FusedStream, Stream};
use parking_lot::Mutex;
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
-use std::task::Poll;
+use std::task::{Poll, Waker};
use bt_common::{PeerId, Uuid};
use crate::central::ScanResult;
use crate::client::CharacteristicNotification;
+
use crate::periodic_advertising::{PeriodicAdvertising, SyncReport};
use crate::pii::GetPeerAddr;
use crate::server::{self, LocalService, ReadResponder, ServiceDefinition, WriteResponder};
@@ -221,24 +222,47 @@
}
}
-#[derive(Clone)]
+#[derive(Default, Debug)]
+struct ScannedResultStreamInner {
+ results: VecDeque<Result<ScanResult>>,
+ waker: Option<Waker>,
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct ScannedResultStreamController(Arc<Mutex<ScannedResultStreamInner>>);
+
+impl ScannedResultStreamController {
+ /// Add a single scanned result item to output from the stream.
+ pub fn add_scanned_result(&self, item: Result<ScanResult>) {
+ let mut lock = self.0.lock();
+ lock.results.push_back(item);
+ if let Some(waker) = lock.waker.take() {
+ waker.wake();
+ }
+ }
+}
+
+#[derive(Debug, Default)]
pub struct ScannedResultStream {
inner: Arc<Mutex<ScannedResultStreamInner>>,
}
-#[derive(Default)]
-pub struct ScannedResultStreamInner {
- result: Option<Result<ScanResult>>,
-}
-
impl ScannedResultStream {
+ /// Creates a new ScannedResultStream.
+ /// Client can get a ScannedResultStreamController using the `controller`
+ /// method.
pub fn new() -> Self {
- Self { inner: Arc::new(Mutex::new(ScannedResultStreamInner::default())) }
+ Self::default()
}
- /// Set scanned result item to output from the stream.
- pub fn set_scanned_result(&mut self, item: Result<ScanResult>) {
- self.inner.lock().result = Some(item);
+ pub fn controller(&self) -> ScannedResultStreamController {
+ ScannedResultStreamController(self.inner.clone())
+ }
+}
+
+impl FusedStream for ScannedResultStream {
+ fn is_terminated(&self) -> bool {
+ self.inner.lock().results.is_empty()
}
}
@@ -247,14 +271,15 @@
fn poll_next(
self: std::pin::Pin<&mut Self>,
- _cx: &mut std::task::Context<'_>,
+ cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut lock = self.inner.lock();
- if lock.result.is_some() {
- Poll::Ready(lock.result.take())
- } else {
- // Never wake up, as if we never find another result
- Poll::Pending
+ match lock.results.pop_front() {
+ Some(result) => Poll::Ready(Some(result)),
+ None => {
+ lock.waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
}
}
}
@@ -345,7 +370,7 @@
impl crate::Central<FakeTypes> for FakeCentral {
fn scan(&self, _filters: &[crate::central::ScanFilter]) -> ScannedResultStream {
- ScannedResultStream::new()
+ ScannedResultStream::default()
}
fn connect(&self, peer_id: PeerId) -> <FakeTypes as GattTypes>::ConnectFuture {
diff --git a/rust/bt-gatt/src/tests.rs b/rust/bt-gatt/src/tests.rs
index 6cab78b..d8188e4 100644
--- a/rust/bt-gatt/src/tests.rs
+++ b/rust/bt-gatt/src/tests.rs
@@ -246,7 +246,8 @@
advertising_sid: Some(0),
periodic_advertising_interval: None,
};
- let _ = scan_results.set_scanned_result(Ok(scanned_result));
+ let controller = scan_results.controller();
+ let _ = controller.add_scanned_result(Ok(scanned_result));
let polled = scan_results.poll_next_unpin(&mut noop_cx);
assert_matches!(polled, Poll::Ready(Some(Ok(_))));