rust/bt-{broadcast-assistant,bass}: Fix bugs from UPF

- Eagerly process scan results for Broadcast Sources in Broadcast
  Assistant EventStream so that all available advertisements are processed
- Eagerly process characteristic notifications from BASS server and
  improve robustness of BASS EventStream
- Fix passcode encoding to match spec
- Improve debug/test experience by printing Peer ID and Broadcast ID in
  hex
- Remove unnecessary Option around BIG-BIS sync map

Test: cargo test
Fixes: b/443062360, b/443063033, b/443062386
Change-Id: I80b284eb90b034fefb6d5cdb160a2086b46be532
Reviewed-on: https://bluetooth-review.googlesource.com/c/bluetooth/+/2800
diff --git a/rust/bt-bap/src/types.rs b/rust/bt-bap/src/types.rs
index 65bb251..f3946d6 100644
--- a/rust/bt-bap/src/types.rs
+++ b/rust/bt-bap/src/types.rs
@@ -10,7 +10,7 @@
 
 /// Broadcast_ID is a 3-byte data on the wire.
 /// Defined in BAP spec v1.0.1 section 3.7.2.1.
-#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
+#[derive(Clone, Copy, Eq, Hash, PartialEq)]
 pub struct BroadcastId(u32);
 
 impl BroadcastId {
@@ -24,7 +24,13 @@
 
 impl std::fmt::Display for BroadcastId {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        write!(f, "{:#08x}", self.0)
+        write!(f, "{:08x}", self.0)
+    }
+}
+
+impl std::fmt::Debug for BroadcastId {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_tuple("BroadcastId").field(&format_args!("0x{}", self)).finish()
     }
 }
 
diff --git a/rust/bt-bass/src/client.rs b/rust/bt-bass/src/client.rs
index 8ad2ef6..fc86482 100644
--- a/rust/bt-bass/src/client.rs
+++ b/rust/bt-bass/src/client.rs
@@ -296,8 +296,8 @@
     ///   in
     /// * `pa_interval` - updated PA interval value. If none, unknown value is
     ///   used
-    /// * `bis_sync` - desired BIG to BIS synchronization information. If empty,
-    ///   it's not updated
+    /// * `bis_map` - desired BIG to BIS synchronization update information. If
+    ///   a BIG does not exist as a key, sync for that BIG is not updated.
     /// * `metadata_map` - map of updated metadata for BIGs. If a mapping does
     ///   not exist for a BIG, that BIG's metadata is not updated
     pub async fn modify_broadcast_source(
@@ -305,7 +305,7 @@
         broadcast_id: BroadcastId,
         pa_sync: PaSync,
         pa_interval: Option<PeriodicAdvertisingInterval>,
-        bis_sync: Option<HashMap<SubgroupIndex, BisSync>>,
+        bis_map: HashMap<SubgroupIndex, BisSync>,
         metadata_map: Option<HashMap<SubgroupIndex, Vec<Metadata>>>,
     ) -> Result<(), Error> {
         let op = {
@@ -314,13 +314,12 @@
                 .ok_or(Error::UnknownBroadcastSource(broadcast_id))?;
 
             // Update BIS_Sync param for BIGs if applicable.
-            if let Some(sync_map) = bis_sync {
-                for (big_index, group) in state.subgroups.iter_mut().enumerate() {
-                    if let Some(bis_sync) = sync_map.get(&(big_index as u8)) {
-                        group.bis_sync = bis_sync.clone();
-                    }
+            for (big_index, group) in state.subgroups.iter_mut().enumerate() {
+                if let Some(bis_sync) = bis_map.get(&(big_index as u8)) {
+                    group.bis_sync = bis_sync.clone();
                 }
             }
+
             // Update metadata for BIGs if applicable.
             if let Some(mut m) = metadata_map {
                 for (big_index, group) in state.subgroups.iter_mut().enumerate() {
@@ -779,7 +778,7 @@
             BroadcastId::try_from(0x11).unwrap(),
             PaSync::DoNotSync,
             Some(PeriodicAdvertisingInterval(0xAAAA)),
-            None,
+            HashMap::new(),
             None,
         );
         pin_mut!(op_fut);
@@ -827,7 +826,7 @@
             BroadcastId::try_from(0x11).unwrap(),
             PaSync::DoNotSync,
             None,
-            Some(HashMap::from([(0, BisSync::sync(vec![1, 3, 5]).unwrap())])),
+            HashMap::from([(0, BisSync::sync(vec![1, 3, 5]).unwrap())]),
             Some(HashMap::from([
                 (0, vec![Metadata::BroadcastAudioImmediateRenderingFlag]),
                 (1, vec![Metadata::Language("eng".to_string())]),
@@ -835,7 +834,7 @@
             ])),
         );
         pin_mut!(op_fut);
-        let polled = op_fut.poll_unpin(&mut noop_cx);
+        let polled: Poll<Result<(), Error>> = op_fut.poll_unpin(&mut noop_cx);
         assert_matches!(polled, Poll::Ready(Ok(_)));
     }
 
@@ -849,7 +848,7 @@
             BroadcastId::try_from(0x11).unwrap(),
             PaSync::DoNotSync,
             None,
-            None,
+            HashMap::new(),
             None,
         );
         pin_mut!(op_fut);
diff --git a/rust/bt-bass/src/client/event.rs b/rust/bt-bass/src/client/event.rs
index bf4349e..f7ad10b 100644
--- a/rust/bt-bass/src/client/event.rs
+++ b/rust/bt-bass/src/client/event.rs
@@ -122,91 +122,75 @@
 
         loop {
             if let Some(item) = self.event_queue.pop_front() {
-                match item {
-                    Ok(event) => return Poll::Ready(Some(Ok(event))),
-                    Err(e) => {
-                        // If an error was received, we terminate the event stream, but send an
-                        // error to indicate why it was terminated.
-                        self.terminated = true;
-                        return Poll::Ready(Some(Err(e)));
+                // An error from a single stream will be reported, but the main EventStream
+                // will continue for other sources.
+                return Poll::Ready(Some(item));
+            }
+
+            let Some(item) = futures::ready!(self.notification_streams.poll_next_unpin(cx)) else {
+                // All notification streams have been closed. Terminate the EventStream.
+                self.terminated = true;
+                let err = Error::EventStream(Box::new(Error::Service(
+                    ServiceError::NotificationChannelClosed(format!(
+                        "All BASS GATT notification streams closed"
+                    )),
+                )));
+                return Poll::Ready(Some(Err(err)));
+            };
+
+            // One of the notification streams produced an error. Report it, but do not
+            // terminate. SelectAll will remove the faulty stream from its set.
+            let Ok(notification) = item else {
+                let err = Error::EventStream(Box::new(Error::Gatt(item.unwrap_err())));
+                return Poll::Ready(Some(Err(err)));
+            };
+
+            let char_handle = notification.handle;
+            let (Ok(new_state), _) = BroadcastReceiveState::decode(notification.value.as_slice())
+            else {
+                self.event_queue.push_back(Ok(Event::UnknownPacket));
+                continue;
+            };
+
+            let maybe_prev_state = {
+                let mut lock = self.broadcast_sources.lock();
+                lock.update_state(char_handle, new_state.clone())
+            };
+
+            // If the previous value was not empty, check if it was overwritten.
+            if let Some(ref prev_state) = maybe_prev_state {
+                if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
+                    if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state) {
+                        self.event_queue.push_back(Ok(Event::RemovedBroadcastSource(
+                            prev_receive_state.broadcast_id,
+                        )));
                     }
                 }
             }
 
-            match self.notification_streams.poll_next_unpin(cx) {
-                Poll::Pending => return Poll::Pending,
-                Poll::Ready(None) => {
-                    let err = Error::EventStream(Box::new(Error::Service(
-                        ServiceError::NotificationChannelClosed(format!(
-                            "GATT notification stream for BRS characteristics closed"
-                        )),
+            // BRS characteristic value was updated with a new broadcast source
+            // information.
+            if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
+                let is_new_source = match maybe_prev_state {
+                    Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
+                    None => true,
+                };
+                if is_new_source {
+                    self.event_queue.push_back(Ok(Event::AddedBroadcastSource(
+                        receive_state.broadcast_id,
+                        receive_state.pa_sync_state,
+                        receive_state.big_encryption,
                     )));
-                    self.event_queue.push_back(Err(err));
-                }
-                Poll::Ready(Some(Err(error))) => {
-                    // Deem all errors as critical.
-                    let err = Error::EventStream(Box::new(Error::Gatt(error)));
-                    self.event_queue.push_back(Err(err));
-                }
-                Poll::Ready(Some(Ok(notification))) => {
-                    let char_handle = notification.handle;
-                    let (Ok(new_state), _) =
-                        BroadcastReceiveState::decode(notification.value.as_slice())
-                    else {
-                        self.event_queue.push_back(Ok(Event::UnknownPacket));
-                        continue;
-                    };
-
-                    let maybe_prev_state = {
-                        let mut lock = self.broadcast_sources.lock();
-                        lock.update_state(char_handle, new_state.clone())
-                    };
-
-                    let mut multi_events = VecDeque::new();
-
-                    // If the previous value was not empty, check if it was overwritten.
-                    if let Some(ref prev_state) = maybe_prev_state {
-                        if let BroadcastReceiveState::NonEmpty(prev_receive_state) = prev_state {
-                            if new_state.is_empty() || !new_state.has_same_broadcast_id(&prev_state)
-                            {
-                                multi_events.push_back(Ok(Event::RemovedBroadcastSource(
-                                    prev_receive_state.broadcast_id,
-                                )));
-                            }
-                        }
+                } else {
+                    let other_events = Event::from_broadcast_receive_state(&receive_state);
+                    for e in other_events.into_iter() {
+                        self.event_queue.push_back(Ok(e));
                     }
-
-                    // BRS characteristic value was updated with a new broadcast source
-                    // information.
-                    if let BroadcastReceiveState::NonEmpty(receive_state) = &new_state {
-                        let is_new_source = match maybe_prev_state {
-                            Some(prev_state) => !new_state.has_same_broadcast_id(&prev_state),
-                            None => true,
-                        };
-                        if is_new_source {
-                            multi_events.push_back(Ok(Event::AddedBroadcastSource(
-                                receive_state.broadcast_id,
-                                receive_state.pa_sync_state,
-                                receive_state.big_encryption,
-                            )));
-                        } else {
-                            let other_events = Event::from_broadcast_receive_state(&receive_state);
-                            for e in other_events.into_iter() {
-                                multi_events.push_back(Ok(e));
-                            }
-                        }
-                    }
-                    if multi_events.len() != 0 {
-                        self.event_queue.append(&mut multi_events);
-                        continue;
-                    }
-                    continue;
                 }
-            };
-
-            break;
+            }
+            // Continue to the top of the loop to start draining the event_queue.
+            continue;
         }
-        Poll::Pending
     }
 }
 
@@ -220,7 +204,7 @@
     use futures::channel::mpsc::unbounded;
 
     use bt_common::core::AddressType;
-    use bt_gatt::types::Handle;
+    use bt_gatt::types::{Error as BtGattError, GattError, Handle};
 
     #[test]
     fn poll_event_stream() {
@@ -371,4 +355,57 @@
         // Should be pending because no more events generated from notifications.
         assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
     }
+
+    #[test]
+    fn error_on_one_stream_does_not_terminate_event_stream() {
+        let mut streams = SelectAll::new();
+        let (sender1, receiver1) = unbounded();
+        let (sender2, receiver2) = unbounded();
+        streams.push(receiver1.boxed());
+        streams.push(receiver2.boxed());
+
+        let source_tracker = Arc::new(Mutex::new(KnownBroadcastSources::new(HashMap::from([
+            (Handle(0x1), BroadcastReceiveState::Empty),
+            (Handle(0x2), BroadcastReceiveState::Empty),
+        ]))));
+        let mut event_streams = EventStream::new(streams, source_tracker);
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+        // Send an error on one stream.
+        sender1.unbounded_send(Err(BtGattError::Gatt(GattError::InvalidPdu))).expect("should send");
+
+        // We should receive the error event.
+        let polled = event_streams.poll_next_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Some(Err(Error::EventStream(_)))));
+
+        // The stream should NOT be terminated.
+        assert!(!event_streams.is_terminated());
+        assert!(event_streams.poll_next_unpin(&mut noop_cx).is_pending());
+
+        // Send a valid notification on the other stream.
+        #[rustfmt::skip]
+        sender2
+            .unbounded_send(Ok(CharacteristicNotification {
+                handle: Handle(0x2),
+                value: vec![
+                    0x02, AddressType::Public as u8,             // source id and address type
+                    0x03, 0x04, 0x05, 0x06, 0x07, 0x08,          // address
+                    0x01, 0x02, 0x03, 0x04,                      // ad set id and broadcast id
+                    PaSyncState::Synced as u8,
+                    EncryptionStatus::NotEncrypted.raw_value(),
+                    0x00,                                        // no subgroups
+                ],
+                maybe_truncated: false,
+            }))
+            .expect("should send");
+
+        // We should be able to receive the event from the second stream.
+        let polled = event_streams.poll_next_unpin(&mut noop_cx);
+        assert_matches!(polled, Poll::Ready(Some(Ok(event))) => {
+            assert_eq!(event, Event::AddedBroadcastSource(BroadcastId::try_from(0x040302).unwrap(), PaSyncState::Synced, EncryptionStatus::NotEncrypted));
+        });
+
+        // The stream should still not be terminated.
+        assert!(!event_streams.is_terminated());
+    }
 }
diff --git a/rust/bt-broadcast-assistant/src/assistant/event.rs b/rust/bt-broadcast-assistant/src/assistant/event.rs
index d9a535a..18a97ae 100644
--- a/rust/bt-broadcast-assistant/src/assistant/event.rs
+++ b/rust/bt-broadcast-assistant/src/assistant/event.rs
@@ -106,40 +106,37 @@
 
         // Poll scan result stream to check if there were any newly discovered peers
         // that we're interested in.
-        match futures::ready!(self.scan_result_stream.poll_next_unpin(cx)) {
-            Some(Ok(scanned)) => {
-                match Self::try_into_broadcast_source(&scanned) {
-                    Err(e) => {
-                        return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData {
-                            peer: scanned.id,
-                            error: e,
-                        })));
-                    }
-                    Ok(Some(found_source)) => {
-                        // If we found a broadcast source, we add its information in the
-                        // internal records.
-                        let (broadcast_source, changed) = self
-                            .broadcast_sources
-                            .merge_broadcast_source_data(&scanned.id, &found_source);
-
-                        // Broadcast found event is relayed to the client iff complete
-                        // information has been gathered.
-                        if broadcast_source.into_add_source() && changed {
-                            return Poll::Ready(Some(Ok(Event::FoundBroadcastSource {
-                                peer: scanned.id,
-                                source: broadcast_source,
-                            })));
-                        }
-
-                        Poll::Pending
-                    }
-                    Ok(None) => Poll::Pending,
-                }
-            }
-            None | Some(Err(_)) => {
+        loop {
+            let Some(Ok(scanned)) = futures::ready!(self.scan_result_stream.poll_next_unpin(cx))
+            else {
                 self.terminated = true;
                 self.broadcast_source_scan_started.store(false, Ordering::Relaxed);
-                Poll::Ready(Some(Err(Error::CentralScanTerminated)))
+                return Poll::Ready(Some(Err(Error::CentralScanTerminated)));
+            };
+
+            let found_source = match Self::try_into_broadcast_source(&scanned) {
+                Err(error) => {
+                    return Poll::Ready(Some(Ok(Event::CouldNotParseAdvertisingData {
+                        peer: scanned.id,
+                        error,
+                    })));
+                }
+                Ok(None) => continue,
+                Ok(Some(source)) => source,
+            };
+
+            // If we found a broadcast source, we add its information in the
+            // internal records.
+            let (broadcast_source, changed) =
+                self.broadcast_sources.merge_broadcast_source_data(&scanned.id, &found_source);
+
+            // Broadcast found event is relayed to the client iff complete
+            // information has been gathered.
+            if broadcast_source.into_add_source() && changed {
+                return Poll::Ready(Some(Ok(Event::FoundBroadcastSource {
+                    peer: scanned.id,
+                    source: broadcast_source,
+                })));
             }
         }
     }
@@ -153,33 +150,34 @@
 
     use bt_common::core::{AddressType, AdvertisingSetId};
     use bt_gatt::central::{AdvertisingDatum, PeerName};
-    use bt_gatt::test_utils::{FakeTypes, ScannedResultStream};
+    use bt_gatt::test_utils::{FakeTypes, ScannedResultStream, ScannedResultStreamController};
     use bt_gatt::types::Error as BtGattError;
     use bt_gatt::types::GattError;
 
-    fn setup_stream() -> (EventStream<FakeTypes>, ScannedResultStream) {
+    fn setup_stream() -> (EventStream<FakeTypes>, ScannedResultStreamController) {
         let fake_scan_result_stream = ScannedResultStream::new();
+        let controller = fake_scan_result_stream.controller();
         let broadcast_sources = DiscoveredBroadcastSources::new();
         let broadcast_source_scan_started = Arc::new(AtomicBool::new(false));
 
         (
             EventStream::<FakeTypes>::new(
-                fake_scan_result_stream.clone(),
+                fake_scan_result_stream,
                 broadcast_sources,
                 broadcast_source_scan_started,
             ),
-            fake_scan_result_stream,
+            controller,
         )
     }
 
     #[test]
     fn poll_found_broadcast_source_events() {
-        let (mut stream, mut scan_result_stream) = setup_stream();
+        let (mut stream, scan_result_controller) = setup_stream();
 
         // Scanned a broadcast source and its broadcast id.
         let broadcast_source_pid = PeerId(1005);
 
-        scan_result_stream.set_scanned_result(Ok(ScanResult {
+        scan_result_controller.add_scanned_result(Ok(ScanResult {
             id: broadcast_source_pid,
             connectable: true,
             name: PeerName::Unknown,
@@ -225,7 +223,7 @@
                     * (big #2 / bis #2) */
         ];
 
-        scan_result_stream.set_scanned_result(Ok(ScanResult {
+        scan_result_controller.add_scanned_result(Ok(ScanResult {
             id: broadcast_source_pid,
             connectable: true,
             name: PeerName::Unknown,
@@ -251,7 +249,7 @@
         assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
 
         // Scanned the same broadcast source's BASE data.
-        scan_result_stream.set_scanned_result(Ok(ScanResult {
+        scan_result_controller.add_scanned_result(Ok(ScanResult {
             id: broadcast_source_pid,
             connectable: true,
             name: PeerName::Unknown,
@@ -270,10 +268,10 @@
 
     #[test]
     fn central_scan_stream_terminates() {
-        let (mut stream, mut scan_result_stream) = setup_stream();
+        let (mut stream, scan_result_controller) = setup_stream();
 
         // Mimick scan error.
-        scan_result_stream.set_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu)));
+        scan_result_controller.add_scanned_result(Err(BtGattError::Gatt(GattError::InvalidPdu)));
 
         let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
         match stream.poll_next_unpin(&mut noop_cx) {
@@ -285,4 +283,68 @@
         assert_matches!(stream.poll_next_unpin(&mut noop_cx), Poll::Ready(None));
         assert_matches!(stream.is_terminated(), true);
     }
+
+    #[test]
+    fn poll_processes_multiple_results_eagerly() {
+        let (mut stream, scan_result_controller) = setup_stream();
+        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
+
+        let broadcast_source_pid = PeerId(1005);
+
+        #[rustfmt::skip]
+        let base_data = vec![
+            0x10, 0x20, 0x30, 0x01, // presentation delay, num of subgroups
+            0x01, 0x03, 0x00, 0x00, 0x00, 0x00, // num of bis, codec id
+            0x00, // codec specific config len
+            0x00, // metadata len,
+            0x01, 0x00, // bis index, codec specific config len
+        ];
+
+        // 1. An irrelevant peer that should be ignored.
+        scan_result_controller.add_scanned_result(Ok(ScanResult {
+            id: PeerId(1),
+            connectable: true,
+            name: PeerName::Unknown,
+            advertised: vec![],
+            advertising_sid: Some(0),
+            periodic_advertising_interval: None,
+        }));
+        // 2. A broadcast source, but incomplete data (only broadcast ID).
+        scan_result_controller.add_scanned_result(Ok(ScanResult {
+            id: broadcast_source_pid,
+            connectable: true,
+            name: PeerName::Unknown,
+            advertised: vec![AdvertisingDatum::ServiceData(
+                BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE,
+                vec![0x01, 0x02, 0x03],
+            )],
+            advertising_sid: Some(1),
+            periodic_advertising_interval: None,
+        }));
+        // 3. The same broadcast source, with BASE data, which completes it.
+        scan_result_controller.add_scanned_result(Ok(ScanResult {
+            id: broadcast_source_pid,
+            connectable: true,
+            name: PeerName::Unknown,
+            advertised: vec![AdvertisingDatum::ServiceData(
+                BASIC_AUDIO_ANNOUNCEMENT_SERVICE,
+                base_data.clone(),
+            )],
+            advertising_sid: Some(1),
+            periodic_advertising_interval: None,
+        }));
+
+        // The stream should eagerly process all three items and emit the
+        // FoundBroadcastSource event.
+        let poll_result = stream.poll_next_unpin(&mut noop_cx);
+        let Poll::Ready(Some(Ok(event))) = poll_result else {
+            panic!("should have received event, but got {:?}", poll_result);
+        };
+        assert_matches!(event, Event::FoundBroadcastSource{peer, ..} => {
+            assert_eq!(peer, broadcast_source_pid);
+        });
+
+        // The underlying stream is now empty, so the next poll should be pending.
+        assert!(stream.poll_next_unpin(&mut noop_cx).is_pending());
+    }
 }
diff --git a/rust/bt-broadcast-assistant/src/assistant/peer.rs b/rust/bt-broadcast-assistant/src/assistant/peer.rs
index 30be3f1..a901b21 100644
--- a/rust/bt-broadcast-assistant/src/assistant/peer.rs
+++ b/rust/bt-broadcast-assistant/src/assistant/peer.rs
@@ -160,7 +160,7 @@
             .unwrap_or(None);
 
         self.bass
-            .modify_broadcast_source(broadcast_id, pa_sync, pa_interval, Some(bis_sync), None)
+            .modify_broadcast_source(broadcast_id, pa_sync, pa_interval, bis_sync, None)
             .await
             .map_err(Into::into)
     }
diff --git a/rust/bt-broadcast-assistant/src/debug.rs b/rust/bt-broadcast-assistant/src/debug.rs
index 1fe4f9e..d8d26d8 100644
--- a/rust/bt-broadcast-assistant/src/debug.rs
+++ b/rust/bt-broadcast-assistant/src/debug.rs
@@ -18,7 +18,7 @@
 use bt_common::generic_audio::metadata_ltv::Metadata;
 use bt_common::PeerId;
 use bt_gatt::pii::GetPeerAddr;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 
 use futures::stream::FusedStream;
 use futures::Future;
@@ -103,7 +103,7 @@
             return;
         };
         if let Err(e) = f(peer).await {
-            eprintln!("failed to perform oepration: {e:?}");
+            eprintln!("failed to perform operation: {e:?}");
         }
     }
 }
@@ -164,18 +164,45 @@
             eprintln!("Failed to parse big index from '{}', ignoring.", parts[0]);
             continue;
         };
-        let Ok(bis_index) = parse_int::<u8>(parts[1]) else {
-            eprintln!("Failed to parse bis index from '{}', ignoring.", parts[1]);
-            continue;
-        };
-        let entry = map.entry(ith_big).or_insert(BisSync::no_sync());
-        if let Err(e) = entry.synchronize_to_index(bis_index) {
-            eprintln!("Invalid BIS index: {e:?}");
+        match parse_int::<u8>(parts[1]) {
+            Ok(bis_index) => {
+                let entry = map.entry(ith_big).or_insert(BisSync::no_sync());
+                if let Err(e) = entry.synchronize_to_index(bis_index) {
+                    eprintln!("Failed to set sync to BIS index: {e:?}");
+                }
+            }
+            Err(_) if parts[1] == "OFF" => {
+                map.insert(ith_big, BisSync::no_sync());
+            }
+            Err(e) => {
+                eprintln!("{e:?} - BIS index should be a number from 1-31, ignoring {}", parts[1]);
+            }
         }
     }
     map
 }
 
+/// Converts a passcode string into a 16-byte broadcast code.
+/// The string is UTF-8 encoded and then padded with zeros on the right to a
+/// total length of 16 bytes. This result is a little-endian byte array
+/// equivalent to a 128-bit value.
+fn passcode_to_broadcast_code(passcode: &str) -> Result<[u8; 16], String> {
+    if passcode.is_empty() {
+        return Err("invalid broadcast code: passcode cannot be empty".to_string());
+    }
+    let code = passcode.as_bytes();
+    if code.len() > 16 {
+        return Err(format!(
+            "invalid broadcast code: '{}'. should be at max length 16, but was {}",
+            passcode,
+            code.len()
+        ));
+    }
+    let mut broadcast_code = [0u8; 16];
+    broadcast_code[..code.len()].copy_from_slice(code);
+    Ok(broadcast_code)
+}
+
 impl<T: bt_gatt::GattTypes + 'static, R: GetPeerAddr> CommandRunner for AssistantDebug<T, R>
 where
     <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
@@ -187,13 +214,18 @@
         cmd: Self::Set,
         args: Vec<String>,
     ) -> impl futures::Future<Output = Result<(), impl std::error::Error>> {
+        let help_subcommands: HashSet<&str> = HashSet::from(["help", "-h", "--help"]);
         async move {
+            if args.len() >= 1 && help_subcommands.contains(args[0].as_str()) {
+                eprintln!("usage: {}", cmd.help_simple());
+                return Ok(());
+            }
             match cmd {
                 AssistantCmd::Info => {
                     let known = self.assistant.known_broadcast_sources();
-                    eprintln!("Known Broadcast Sources:");
+                    println!("Known Broadcast Sources:");
                     for (id, s) in known {
-                        eprintln!("PeerId ({id}), source: {s:?}");
+                        println!("PeerId ({id}), source: {s:?}");
                     }
                 }
                 AssistantCmd::Connect => {
@@ -240,19 +272,16 @@
                         return Ok(());
                     };
 
-                    let code = args[1].as_bytes();
-                    if code.len() > 16 {
-                        eprintln!(
-                            "invalid broadcast code: {}. should be at max length 16",
-                            args[1]
-                        );
-                        return Ok(());
-                    }
-                    let mut passcode_vec = vec![0; 16];
-                    passcode_vec[16 - code.len()..16].copy_from_slice(code);
+                    let broadcast_code = match passcode_to_broadcast_code(&args[1]) {
+                        Ok(code) => code,
+                        Err(e) => {
+                            eprintln!("{e:?}");
+                            return Ok(());
+                        }
+                    };
+
                     self.with_peer(|peer| async move {
-                        peer.send_broadcast_code(broadcast_id, passcode_vec.try_into().unwrap())
-                            .await
+                        peer.send_broadcast_code(broadcast_id, broadcast_code).await
                     })
                     .await;
                 }
@@ -383,7 +412,7 @@
                         advertising_sid,
                     ) {
                         Ok(source) => {
-                            eprintln!("broadcast source after additional info: {source:?}")
+                            println!("broadcast source after additional info: {source:?}")
                         }
                         Err(e) => {
                             eprintln!("failed to enter in broadcast source information: {e:?}")
@@ -401,7 +430,7 @@
                     }
 
                     let Ok(peer_id) = parse_peer_id(&args[0]) else {
-                        println!("invalid peer id: {}", args[0]);
+                        eprintln!("invalid peer id: {}", args[0]);
                         return Ok(());
                     };
 
@@ -432,7 +461,7 @@
                         .assistant
                         .force_discover_broadcast_source_metadata(peer_id, all_big_metadata)
                     {
-                        Ok(source) => eprintln!("broadcast source with metadata: {source:?}"),
+                        Ok(source) => println!("broadcast source with metadata: {source:?}"),
                         Err(e) => eprintln!("failed to enter in broadcast source metadata: {e:?}"),
                     }
                 }
@@ -465,7 +494,7 @@
                         .assistant
                         .force_discover_broadcast_source_metadata(peer_id, all_big_metadata)
                     {
-                        Ok(source) => eprintln!("broadcast source with metadata: {source:?}"),
+                        Ok(source) => println!("broadcast source with metadata: {source:?}"),
                         Err(e) => {
                             eprintln!("failed to enter in empty broadcast source metadata: {e:?}")
                         }
@@ -524,11 +553,23 @@
 
     #[test]
     fn test_parse_bis_sync() {
+        // Basic case with multiple BIGs and BIS indices.
         let bis_sync = parse_bis_sync("0-1,0-2,1-1");
         assert_eq!(bis_sync.len(), 2);
         assert_eq!(bis_sync.get(&0), Some(&BisSync::sync(vec![1, 2]).unwrap()));
         assert_eq!(bis_sync.get(&1), Some(&BisSync::sync(vec![1]).unwrap()));
 
+        // Case with "OFF" to disable sync for a BIG.
+        let bis_sync = parse_bis_sync("0-1,1-OFF,0-2");
+        assert_eq!(bis_sync.len(), 2);
+        assert_eq!(bis_sync.get(&0), Some(&BisSync::sync(vec![1, 2]).unwrap()));
+        assert_eq!(bis_sync.get(&1), Some(&BisSync::no_sync()));
+
+        // Case where sync is set and then turned off for the same BIG.
+        let bis_sync = parse_bis_sync("0-5,0-OFF");
+        assert_eq!(bis_sync.len(), 1);
+        assert_eq!(bis_sync.get(&0), Some(&BisSync::no_sync()));
+
         // Will ignore invalid values.
         let bis_sync = parse_bis_sync("0-1,0-2,1:1,1-1-1,");
         assert_eq!(bis_sync.len(), 1);
@@ -537,4 +578,34 @@
         let bis_sync = parse_bis_sync("hellothisistoallynotvalid");
         assert_eq!(bis_sync.len(), 0);
     }
+
+    #[test]
+    fn test_passcode_to_broadcast_code() {
+        // UTF-8 string that is less than 16 bytes.
+        // Source of truth test case from Bluetooth Spec.
+        let code = "Børne House";
+        let expected = [
+            0x42, 0xc3, 0xb8, 0x72, 0x6e, 0x65, 0x20, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x00, 0x00,
+            0x00, 0x00,
+        ];
+        let actual = passcode_to_broadcast_code(code).expect("should succeed");
+        assert_eq!(actual, expected);
+        assert_eq!(u128::from_le_bytes(actual), 0x00000000_6573756F_4820656E_72B8C342);
+
+        // Valid ASCII passcode, exactly 16 bytes.
+        let code = "1234567890123456";
+        let expected = [
+            0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30, 0x31, 0x32, 0x33, 0x34,
+            0x35, 0x36,
+        ];
+        assert_eq!(passcode_to_broadcast_code(code).unwrap(), expected);
+
+        // Invalid passcode, over 16 bytes.
+        let code = "12345678901234567";
+        assert!(passcode_to_broadcast_code(code).is_err());
+
+        // Empty passcode should be an error.
+        let code = "";
+        assert!(passcode_to_broadcast_code(code).is_err());
+    }
 }
diff --git a/rust/bt-common/src/lib.rs b/rust/bt-common/src/lib.rs
index 382567d..1cb2cf0 100644
--- a/rust/bt-common/src/lib.rs
+++ b/rust/bt-common/src/lib.rs
@@ -13,7 +13,7 @@
 /// same peer as long as the PeerId was retrieved after the `Central` was
 /// instantiated. PeerIds can be valid longer than that (often if the peer is
 /// bonded)
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+#[derive(Clone, Copy, PartialEq, Eq, Hash)]
 pub struct PeerId(pub u64);
 
 impl rust_core::fmt::Display for PeerId {
@@ -21,7 +21,13 @@
         &self,
         f: &mut rust_core::fmt::Formatter<'_>,
     ) -> std::result::Result<(), std::fmt::Error> {
-        write!(f, "{:x}", self.0)
+        write!(f, "{:016x}", self.0)
+    }
+}
+
+impl std::fmt::Debug for PeerId {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_tuple("PeerId").field(&format_args!("0x{}", self)).finish()
     }
 }
 
diff --git a/rust/bt-gatt/src/test_utils.rs b/rust/bt-gatt/src/test_utils.rs
index d528b47..3397799 100644
--- a/rust/bt-gatt/src/test_utils.rs
+++ b/rust/bt-gatt/src/test_utils.rs
@@ -5,16 +5,17 @@
 use bt_common::core::{Address, AddressType};
 use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
 use futures::future::{ready, Ready};
-use futures::Stream;
+use futures::stream::{FusedStream, Stream};
 use parking_lot::Mutex;
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
 use std::sync::Arc;
-use std::task::Poll;
+use std::task::{Poll, Waker};
 
 use bt_common::{PeerId, Uuid};
 
 use crate::central::ScanResult;
 use crate::client::CharacteristicNotification;
+
 use crate::periodic_advertising::{PeriodicAdvertising, SyncReport};
 use crate::pii::GetPeerAddr;
 use crate::server::{self, LocalService, ReadResponder, ServiceDefinition, WriteResponder};
@@ -221,24 +222,47 @@
     }
 }
 
-#[derive(Clone)]
+#[derive(Default, Debug)]
+struct ScannedResultStreamInner {
+    results: VecDeque<Result<ScanResult>>,
+    waker: Option<Waker>,
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct ScannedResultStreamController(Arc<Mutex<ScannedResultStreamInner>>);
+
+impl ScannedResultStreamController {
+    /// Add a single scanned result item to output from the stream.
+    pub fn add_scanned_result(&self, item: Result<ScanResult>) {
+        let mut lock = self.0.lock();
+        lock.results.push_back(item);
+        if let Some(waker) = lock.waker.take() {
+            waker.wake();
+        }
+    }
+}
+
+#[derive(Debug, Default)]
 pub struct ScannedResultStream {
     inner: Arc<Mutex<ScannedResultStreamInner>>,
 }
 
-#[derive(Default)]
-pub struct ScannedResultStreamInner {
-    result: Option<Result<ScanResult>>,
-}
-
 impl ScannedResultStream {
+    /// Creates a new ScannedResultStream.
+    /// Client can get a ScannedResultStreamController using the `controller`
+    /// method.
     pub fn new() -> Self {
-        Self { inner: Arc::new(Mutex::new(ScannedResultStreamInner::default())) }
+        Self::default()
     }
 
-    /// Set scanned result item to output from the stream.
-    pub fn set_scanned_result(&mut self, item: Result<ScanResult>) {
-        self.inner.lock().result = Some(item);
+    pub fn controller(&self) -> ScannedResultStreamController {
+        ScannedResultStreamController(self.inner.clone())
+    }
+}
+
+impl FusedStream for ScannedResultStream {
+    fn is_terminated(&self) -> bool {
+        self.inner.lock().results.is_empty()
     }
 }
 
@@ -247,14 +271,15 @@
 
     fn poll_next(
         self: std::pin::Pin<&mut Self>,
-        _cx: &mut std::task::Context<'_>,
+        cx: &mut std::task::Context<'_>,
     ) -> Poll<Option<Self::Item>> {
         let mut lock = self.inner.lock();
-        if lock.result.is_some() {
-            Poll::Ready(lock.result.take())
-        } else {
-            // Never wake up, as if we never find another result
-            Poll::Pending
+        match lock.results.pop_front() {
+            Some(result) => Poll::Ready(Some(result)),
+            None => {
+                lock.waker = Some(cx.waker().clone());
+                Poll::Pending
+            }
         }
     }
 }
@@ -345,7 +370,7 @@
 
 impl crate::Central<FakeTypes> for FakeCentral {
     fn scan(&self, _filters: &[crate::central::ScanFilter]) -> ScannedResultStream {
-        ScannedResultStream::new()
+        ScannedResultStream::default()
     }
 
     fn connect(&self, peer_id: PeerId) -> <FakeTypes as GattTypes>::ConnectFuture {
diff --git a/rust/bt-gatt/src/tests.rs b/rust/bt-gatt/src/tests.rs
index 6cab78b..d8188e4 100644
--- a/rust/bt-gatt/src/tests.rs
+++ b/rust/bt-gatt/src/tests.rs
@@ -246,7 +246,8 @@
         advertising_sid: Some(0),
         periodic_advertising_interval: None,
     };
-    let _ = scan_results.set_scanned_result(Ok(scanned_result));
+    let controller = scan_results.controller();
+    let _ = controller.add_scanned_result(Ok(scanned_result));
 
     let polled = scan_results.poll_next_unpin(&mut noop_cx);
     assert_matches!(polled, Poll::Ready(Some(Ok(_))));