// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use thiserror::Error;

use bt_bap::types::BroadcastId;
use bt_bass::client::error::Error as BassClientError;
use bt_bass::client::BroadcastAudioScanServiceClient;
use bt_common::{PeerId, Uuid};
use bt_gatt::central::*;
use bt_gatt::client::PeerServiceHandle;
use bt_gatt::types::Error as GattError;
use bt_gatt::Client;

pub mod event;
use event::*;
pub mod peer;
pub use peer::Peer;

use crate::types::*;

pub const BROADCAST_AUDIO_SCAN_SERVICE: Uuid = Uuid::from_u16(0x184F);
pub const BASIC_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1851);
pub const BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE: Uuid = Uuid::from_u16(0x1852);

#[derive(Debug, Error)]
pub enum Error {
    #[error("GATT operation error: {0:?}")]
    Gatt(#[from] GattError),

    #[error("Broadcast Audio Scan Service client error at peer ({0}): {1:?}")]
    BassClient(PeerId, BassClientError),

    #[error("Not connected to Broadcast Audio Scan Service at peer ({0})")]
    NotConnectedToBass(PeerId),

    #[error("Central scanning terminated unexpectedly")]
    CentralScanTerminated,

    #[error("Failed to connect to service ({1}) at peer ({0})")]
    ConnectionFailure(PeerId, Uuid),

    #[error("Broadcast Assistant was already started previously. It cannot be started twice")]
    AlreadyStarted,

    #[error("Failed due to error: {0}")]
    Generic(String),
}

/// Contains information about the currently-known broadcast
/// sources and the peers they were found on
#[derive(Debug)]
pub(crate) struct DiscoveredBroadcastSources(Mutex<HashMap<PeerId, BroadcastSource>>);

impl DiscoveredBroadcastSources {
    /// Creates a shareable instance of `DiscoveredBroadcastSources`.
    pub fn new() -> Arc<Self> {
        Arc::new(Self(Mutex::new(HashMap::new())))
    }

    /// Merges the broadcast source data with existing broadcast source data.
    /// Returns the copy of the broadcast source data after the merge and
    /// indicates whether it has changed from before or not.
    pub(crate) fn merge_broadcast_source_data(
        &self,
        peer_id: &PeerId,
        data: &BroadcastSource,
    ) -> (BroadcastSource, bool) {
        let mut lock = self.0.lock();
        let source = lock.entry(*peer_id).or_default();
        let before = source.clone();

        source.merge(data);
        let after = source.clone();
        let changed = before != after;

        (after, changed)
    }

    /// Get a BroadcastSource from a peer id.
    fn get_by_peer_id(&self, peer_id: &PeerId) -> Option<BroadcastSource> {
        let lock = self.0.lock();
        lock.get(&peer_id).clone().map(|source| source.clone())
    }

    /// Get a BroadcastSource from associated broadcast id.
    fn get_by_broadcast_id(&self, broadcast_id: &BroadcastId) -> Option<BroadcastSource> {
        let lock = self.0.lock();
        let info = lock.iter().find(|(&_k, &ref v)| v.broadcast_id == Some(*broadcast_id));
        match info {
            Some((&_peer_id, &ref broadcast_source)) => Some(broadcast_source.clone()),
            None => None,
        }
    }
}

pub struct BroadcastAssistant<T: bt_gatt::GattTypes> {
    central: T::Central,
    broadcast_sources: Arc<DiscoveredBroadcastSources>,
    broadcast_source_scan_started: Arc<AtomicBool>,
}

impl<T: bt_gatt::GattTypes + 'static> BroadcastAssistant<T> {
    // Creates a broadcast assistant and sets it up to be ready
    // for broadcast source scanning. Clients must use the `start`
    // method to poll the event stream for scan results.
    pub fn new(central: T::Central) -> Self {
        Self {
            central,
            broadcast_sources: DiscoveredBroadcastSources::new(),
            broadcast_source_scan_started: Arc::new(AtomicBool::new(false)),
        }
    }

    /// List of scan filters for advertisement data Broadcast Assistant should
    /// look for, which are:
    /// - Service data with Broadcast Audio Announcement Service UUID from
    ///   Broadcast Sources (see BAP spec v1.0.1 Section 3.7.2.1 for details)
    // TODO(b/308481381): define filter for finding broadcast sink.
    fn scan_filters() -> Vec<ScanFilter> {
        vec![Filter::HasServiceData(BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE).into()]
    }

    /// Start broadcast assistant. Returns EventStream that the upper layer can
    /// poll. Upper layer can call methods on BroadcastAssistant based on the
    /// events it sees.
    pub fn start(&mut self) -> Result<EventStream<T>, Error> {
        if self.is_started() {
            return Err(Error::AlreadyStarted);
        }
        let scan_result_stream = self.central.scan(&Self::scan_filters());
        self.broadcast_source_scan_started.store(true, Ordering::Relaxed);
        Ok(EventStream::<T>::new(
            scan_result_stream,
            self.broadcast_sources.clone(),
            self.broadcast_source_scan_started.clone(),
        ))
    }

    /// Returns whether or not Broadcast Assistant has started.
    fn is_started(&self) -> bool {
        self.broadcast_source_scan_started.load(Ordering::Relaxed)
    }

    pub fn scan_for_scan_delegators(&mut self) -> Result<T::ScanResultStream, Error> {
        if self.is_started() {
            return Err(Error::Generic(format!(
                "Cannot scan for scan delegators while scanning for broadcast sources"
            )));
        }
        // Scan for service data with Broadcast Audio Scan Service UUID to look
        // for Broadcast Sink collocated with the Scan Delegator (see BAP spec v1.0.1
        // Section 3.9.2 for details).
        Ok(self.central.scan(&vec![Filter::HasServiceData(BROADCAST_AUDIO_SCAN_SERVICE).into()]))
    }

    pub async fn connect_to_scan_delegator(&self, peer_id: PeerId) -> Result<Peer<T>, Error>
    where
        <T as bt_gatt::GattTypes>::NotificationStream: std::marker::Send,
    {
        let client = self.central.connect(peer_id).await?;
        let service_handles = client.find_service(BROADCAST_AUDIO_SCAN_SERVICE).await?;

        for handle in service_handles {
            if handle.uuid() != BROADCAST_AUDIO_SCAN_SERVICE || !handle.is_primary() {
                continue;
            }
            let service = handle.connect().await?;
            let bass = BroadcastAudioScanServiceClient::<T>::create(service)
                .await
                .map_err(|e| Error::BassClient(peer_id, e))?;

            let connected_peer =
                Peer::<T>::new(peer_id, client, bass, self.broadcast_sources.clone());
            return Ok(connected_peer);
        }
        Err(Error::ConnectionFailure(peer_id, BROADCAST_AUDIO_SCAN_SERVICE))
    }

    // Manually adds broadcast source information for debugging purposes.
    #[cfg(any(test, feature = "debug"))]
    pub fn force_discover_broadcast_source(
        &self,
        peer_id: PeerId,
        address: [u8; 6],
        address_type: bt_common::core::AddressType,
        advertising_sid: bt_common::core::AdvertisingSetId,
    ) -> Result<BroadcastSource, Error> {
        let broadcast_source = BroadcastSource {
            address: Some(address),
            address_type: Some(address_type),
            advertising_sid: Some(advertising_sid),
            broadcast_id: None,
            pa_interval: None,
            endpoint: None,
        };

        Ok(self.broadcast_sources.merge_broadcast_source_data(&peer_id, &broadcast_source).0)
    }

    // Manually adds broadcast source information for debugging purposes.
    #[cfg(any(test, feature = "debug"))]
    pub fn force_discover_broadcast_source_metadata(
        &self,
        peer_id: PeerId,
        big_metadata: Vec<Vec<bt_common::generic_audio::metadata_ltv::Metadata>>,
    ) -> Result<BroadcastSource, Error> {
        use bt_bap::types::{BroadcastAudioSourceEndpoint, BroadcastIsochronousGroup};
        use bt_common::core::CodecId;

        let mut big = Vec::new();
        for metadata in big_metadata {
            let group = BroadcastIsochronousGroup {
                codec_id: CodecId::Assigned(bt_common::core::CodingFormat::ALawLog), // mock.
                codec_specific_configs: vec![],
                metadata,
                bis: vec![],
            };
            big.push(group);
        }

        let endpoint = BroadcastAudioSourceEndpoint { presentation_delay_ms: 0, big };

        let broadcast_source = BroadcastSource {
            address: None,
            address_type: None,
            advertising_sid: None,
            broadcast_id: None,
            pa_interval: None,
            endpoint: Some(endpoint),
        };

        Ok(self.broadcast_sources.merge_broadcast_source_data(&peer_id, &broadcast_source).0)
    }

    // Gets the broadcast sources currently known by the broadcast
    // assistant.
    pub fn known_broadcast_sources(&self) -> std::collections::HashMap<PeerId, BroadcastSource> {
        let lock = self.broadcast_sources.0.lock();
        let mut m = HashMap::new();
        for (pid, source) in lock.iter() {
            m.insert(*pid, source.clone());
        }
        m
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use futures::{pin_mut, FutureExt};
    use std::task::Poll;

    use bt_bap::types::*;
    use bt_common::core::{AddressType, AdvertisingSetId};
    use bt_common::generic_audio::metadata_ltv::Metadata;
    use bt_gatt::test_utils::{FakeCentral, FakeClient, FakeTypes};

    use crate::assistant::peer::tests::fake_bass_service;

    #[test]
    fn merge_broadcast_source() {
        let discovered = DiscoveredBroadcastSources::new();
        let bid = BroadcastId::try_from(1001).unwrap();
        let (bs, changed) = discovered.merge_broadcast_source_data(
            &PeerId(1001),
            &BroadcastSource::default()
                .with_address([1, 2, 3, 4, 5, 6])
                .with_address_type(AddressType::Public)
                .with_advertising_sid(AdvertisingSetId(1))
                .with_broadcast_id(bid),
        );
        assert!(changed);
        assert_eq!(
            bs,
            BroadcastSource {
                address: Some([1, 2, 3, 4, 5, 6]),
                address_type: Some(AddressType::Public),
                advertising_sid: Some(AdvertisingSetId(1)),
                broadcast_id: Some(bid),
                pa_interval: None,
                endpoint: None,
            }
        );

        let (bs, changed) = discovered.merge_broadcast_source_data(
            &PeerId(1001),
            &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
                BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
            ),
        );
        assert!(changed);
        assert_eq!(
            bs,
            BroadcastSource {
                address: Some([1, 2, 3, 4, 5, 6]),
                address_type: Some(AddressType::Random),
                advertising_sid: Some(AdvertisingSetId(1)),
                broadcast_id: Some(bid),
                pa_interval: None,
                endpoint: Some(BroadcastAudioSourceEndpoint {
                    presentation_delay_ms: 32,
                    big: vec![]
                }),
            }
        );

        let (_, changed) = discovered.merge_broadcast_source_data(
            &PeerId(1001),
            &BroadcastSource::default().with_address_type(AddressType::Random).with_endpoint(
                BroadcastAudioSourceEndpoint { presentation_delay_ms: 32, big: vec![] },
            ),
        );
        assert!(!changed);
    }

    #[test]
    fn start_stream() {
        let mut assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new());
        let stream = assistant.start().expect("can start stream");

        // Stream can only be started once.
        assert!(assistant.is_started());
        assert!(assistant.start().is_err());

        // After the stream is dropped, it can be started again.
        drop(stream);
        assert!(!assistant.is_started());
        assert!(assistant.start().is_ok());
    }

    #[test]
    fn connect_to_scan_delegator() {
        // Set up fake GATT related objects.
        let mut central = FakeCentral::new();
        let mut client = FakeClient::new();
        central.add_client(PeerId(1004), client.clone());
        let service = fake_bass_service();
        client.add_service(BROADCAST_AUDIO_SCAN_SERVICE, true, service.clone());

        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());
        let assistant = BroadcastAssistant::<FakeTypes>::new(central);
        let conn_fut = assistant.connect_to_scan_delegator(PeerId(1004));
        pin_mut!(conn_fut);
        let polled = conn_fut.poll_unpin(&mut noop_cx);
        let Poll::Ready(res) = polled else {
            panic!("should be ready");
        };
        let _ = res.expect("should be ok");
    }

    #[test]
    fn force_discover_broadcast_source_test() {
        let assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new());
        let peer_id = PeerId(1);
        let address = [1, 2, 3, 4, 5, 6];
        let address_type = AddressType::Public;
        let sid = AdvertisingSetId(1);

        let source =
            assistant.force_discover_broadcast_source(peer_id, address, address_type, sid).unwrap();

        assert_eq!(source.address, Some(address));
        assert_eq!(source.address_type, Some(address_type));
        assert_eq!(source.advertising_sid, Some(sid));
    }

    #[test]
    fn force_discover_broadcast_source_metadata_test() {
        let assistant = BroadcastAssistant::<FakeTypes>::new(FakeCentral::new());
        let peer_id = PeerId(1);
        let metadata = vec![vec![Metadata::BroadcastAudioImmediateRenderingFlag]];

        let source =
            assistant.force_discover_broadcast_source_metadata(peer_id, metadata.clone()).unwrap();

        let endpoint = source.endpoint.unwrap();
        assert_eq!(endpoint.big.len(), 1);
        assert_eq!(endpoint.big[0].metadata, metadata[0]);
    }
}
