// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! Implements the Published Audio Capabilities Service server role.
//!
//! Use the `ServerBuilder` to define a new `Server` instance with the specified
//! characteristics. The server isn't published to GATT until
//! `Server::publish` method is called.
//! Once the `Server` is published, poll on it to receive events from the
//! `Server`, which are created as it processes incoming client requests.
//!
//! For example:
//!
//! // Set up a GATT Server which implements `bt_gatt::ServerTypes::Server`.
//! let gatt_server = ...;
//! // Define supported and available audio contexts for this PACS.
//! let supported = AudioContexts::new(...);
//! let available = AudioContexts::new(...);
//! let pacs_server = ServerBuilder::new()
//!         .with_sources(...)
//!         .with_sinks(...)
//!         .build(supported, available)?;
//!
//! // Publish the server.
//! pacs_server.publish(gatt_server).expect("publishes fine");
//! // Process events from the PACS server.
//! while let Some(event) = pacs_server.next().await {
//!     // Do something with `event`
//! }

use bt_common::generic_audio::ContextType;
use bt_gatt::Server as _;
use bt_gatt::server::LocalService;
use bt_gatt::server::{ReadResponder, ServiceDefinition, WriteResponder};
use bt_gatt::types::{GattError, Handle};
use futures::task::{Poll, Waker};
use futures::{Future, Stream};
use pin_project::pin_project;
use std::collections::HashMap;
use thiserror::Error;

use crate::{
    AudioLocations, AvailableAudioContexts, PacRecord, SinkAudioLocations, SourceAudioLocations,
    SupportedAudioContexts,
};

pub(crate) mod types;
use crate::server::types::*;

#[pin_project(project = LocalServiceProj)]
enum LocalServiceState<T: bt_gatt::ServerTypes> {
    NotPublished {
        waker: Option<Waker>,
    },
    Preparing {
        #[pin]
        fut: T::LocalServiceFut,
    },
    Published {
        service: T::LocalService,
        #[pin]
        events: T::ServiceEventStream,
    },
    Terminated,
}

impl<T: bt_gatt::ServerTypes> Default for LocalServiceState<T> {
    fn default() -> Self {
        Self::NotPublished { waker: None }
    }
}

#[derive(Debug, Error)]
pub enum Error {
    #[error("Service is already published")]
    AlreadyPublished,
    #[error("Issue publishing service: {0}")]
    PublishError(#[from] bt_gatt::types::Error),
    #[error("Service should support at least one of Sink or Source PAC characteristics")]
    MissingPac,
    #[error("Available audio contexts are not supported: {0:?}")]
    UnsupportedAudioContexts(Vec<ContextType>),
}

impl<T: bt_gatt::ServerTypes> Stream for LocalServiceState<T> {
    type Item = Result<bt_gatt::server::ServiceEvent<T>, Error>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        // SAFETY:
        //  - Wakers are Unpin
        //  - We re-pin the structurally pinned futures in Preparing and Published
        //    (service is untouched)
        //  - Terminated is empty
        loop {
            match self.as_mut().project() {
                LocalServiceProj::Terminated => return Poll::Ready(None),
                LocalServiceProj::NotPublished { .. } => {
                    self.as_mut()
                        .set(LocalServiceState::NotPublished { waker: Some(cx.waker().clone()) });
                    return Poll::Pending;
                }
                LocalServiceProj::Preparing { fut } => {
                    let service_result = futures::ready!(fut.poll(cx));
                    let Ok(service) = service_result else {
                        return Poll::Ready(Some(Err(Error::PublishError(
                            service_result.err().unwrap(),
                        ))));
                    };
                    let events = service.publish();
                    self.as_mut().set(LocalServiceState::Published { service, events });
                    continue;
                }
                LocalServiceProj::Published { service: _, events } => {
                    let item = futures::ready!(events.poll_next(cx));
                    let Some(gatt_result) = item else {
                        self.as_mut().set(LocalServiceState::Terminated);
                        return Poll::Ready(None);
                    };
                    let Ok(event) = gatt_result else {
                        self.as_mut().set(LocalServiceState::Terminated);
                        return Poll::Ready(Some(Err(Error::PublishError(
                            gatt_result.err().unwrap(),
                        ))));
                    };
                    return Poll::Ready(Some(Ok(event)));
                }
            }
        }
    }
}

impl<T: bt_gatt::ServerTypes> LocalServiceState<T> {
    fn is_published(&self) -> bool {
        if let LocalServiceState::NotPublished { .. } = self { false } else { true }
    }
}

#[derive(Default)]
pub struct ServerBuilder {
    source_pacs: Vec<Vec<PacRecord>>,
    source_audio_locations: Option<AudioLocations>,
    sink_pacs: Vec<Vec<PacRecord>>,
    sink_audio_locations: Option<AudioLocations>,
}

impl ServerBuilder {
    pub fn new() -> ServerBuilder {
        ServerBuilder::default()
    }

    /// Adds a source PAC characteristic to the builder.
    /// Each call adds a new characteristic.
    /// `capabilities` represents the records for a single PAC characteristic.
    /// If `capabilities` is empty, it will be ignored.
    pub fn add_source(mut self, capabilities: Vec<PacRecord>) -> Self {
        if !capabilities.is_empty() {
            self.source_pacs.push(capabilities);
        }
        self
    }

    /// Sets the audio locations for the source.
    /// This corresponds to a single Source Audio Locations characteristic.
    pub fn set_source_locations(mut self, audio_locations: AudioLocations) -> Self {
        self.source_audio_locations = Some(audio_locations);
        self
    }

    /// Adds a sink PAC characteristic to the builder.
    /// Each call adds a new characteristic.
    /// `capabilities` represents the records for a single PAC characteristic.
    /// If `capabilities` is empty, it will be ignored.
    pub fn add_sink(mut self, capabilities: Vec<PacRecord>) -> Self {
        if !capabilities.is_empty() {
            self.sink_pacs.push(capabilities);
        }
        self
    }

    /// Sets the audio locations for the sink.
    /// This corresponds to a single Sink Audio Locations characteristic.
    pub fn set_sink_locations(mut self, audio_locations: AudioLocations) -> Self {
        self.sink_audio_locations = Some(audio_locations);
        self
    }

    fn verify_characteristics(
        &self,
        supported: &AudioContexts,
        available: &AudioContexts,
    ) -> Result<(), Error> {
        // If the corresponding bit in the supported audio contexts is
        // not set to 0b1, we shall not set a bit to 0b1 in the
        // available audio contexts. See PACS v1.0.1 section 3.5.1.
        let diff: Vec<ContextType> = available.sink.difference(&supported.sink).cloned().collect();
        if diff.len() != 0 {
            return Err(Error::UnsupportedAudioContexts(diff));
        }
        let diff: Vec<ContextType> =
            available.source.difference(&supported.source).cloned().collect();
        if diff.len() != 0 {
            return Err(Error::UnsupportedAudioContexts(diff));
        }

        // PACS server must have at least one Sink or Source PACS record.
        if self.source_pacs.len() == 0 && self.sink_pacs.len() == 0 {
            return Err(Error::MissingPac);
        }
        Ok(())
    }

    /// Builds a server after verifying all the defined characteristics
    /// for this server (see PACS v1.0.1 section 3 for details).
    pub fn build<T>(
        mut self,
        mut supported: AudioContexts,
        available: AudioContexts,
    ) -> Result<Server<T>, Error>
    where
        T: bt_gatt::ServerTypes,
    {
        let _ = self.verify_characteristics(&supported, &available)?;

        let mut service_def = ServiceDefinition::new(
            bt_gatt::server::ServiceId::new(1),
            crate::PACS_UUID,
            bt_gatt::types::ServiceKind::Primary,
        );

        let supported = SupportedAudioContexts {
            handle: SUPPORTED_AUDIO_CONTEXTS_HANDLE,
            sink: supported.sink.drain().collect(),
            source: supported.source.drain().collect(),
        };
        let _ = service_def.add_characteristic((&supported).into());

        let available = AvailableAudioContexts {
            handle: AVAILABLE_AUDIO_CONTEXTS_HANDLE,
            sink: (&available.sink).into(),
            source: (&available.source).into(),
        };
        let _ = service_def.add_characteristic((&available).into());

        let mut next_handle_iter = (HANDLE_OFFSET..).map(|x| Handle(x));
        let mut audio_capabilities = HashMap::new();

        // Sink audio locations characteristic may exist iff it's defined
        // and there are valid sink PAC characteristics.
        let sink_audio_locations = match self.sink_audio_locations.take() {
            Some(locations) if self.sink_pacs.len() > 0 => {
                let sink =
                    SinkAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
                let _ = service_def.add_characteristic((&sink).into());
                Some(sink)
            }
            _ => None,
        };
        for capabilities in self.sink_pacs.drain(..) {
            let handle = next_handle_iter.next().unwrap();
            let pac = PublishedAudioCapability::new_sink(handle, capabilities);
            let _ = service_def.add_characteristic((&pac).into());
            audio_capabilities.insert(handle, pac);
        }

        // Source audio locations characteristic may exist iff it's defined
        // and there are valid source PAC characteristics.
        let source_audio_locations = match self.source_audio_locations.take() {
            Some(locations) if self.source_pacs.len() > 0 => {
                let source =
                    SourceAudioLocations { handle: next_handle_iter.next().unwrap(), locations };
                let _ = service_def.add_characteristic((&source).into());
                Some(source)
            }
            _ => None,
        };
        for capabilities in self.source_pacs.drain(..) {
            let handle = next_handle_iter.next().unwrap();
            let pac = PublishedAudioCapability::new_source(handle, capabilities);
            let _ = service_def.add_characteristic((&pac).into());
            audio_capabilities.insert(handle, pac);
        }

        let server = Server {
            service_def,
            local_service: Default::default(),
            published_audio_capabilities: audio_capabilities,
            source_audio_locations,
            sink_audio_locations,
            available_audio_contexts: available,
            supported_audio_contexts: supported,
        };
        Ok(server)
    }
}

#[pin_project]
pub struct Server<T: bt_gatt::ServerTypes> {
    service_def: ServiceDefinition,
    #[pin]
    local_service: LocalServiceState<T>,
    published_audio_capabilities: HashMap<Handle, PublishedAudioCapability>,
    source_audio_locations: Option<SourceAudioLocations>,
    sink_audio_locations: Option<SinkAudioLocations>,
    available_audio_contexts: AvailableAudioContexts,
    supported_audio_contexts: SupportedAudioContexts,
}

impl<T: bt_gatt::ServerTypes> Server<T> {
    pub fn publish(&mut self, server: T::Server) -> Result<(), Error> {
        if self.local_service.is_published() {
            return Err(Error::AlreadyPublished);
        }

        let LocalServiceState::NotPublished { waker } = std::mem::replace(
            &mut self.local_service,
            LocalServiceState::Preparing { fut: server.prepare(self.service_def.clone()) },
        ) else {
            unreachable!();
        };
        waker.map(Waker::wake);
        Ok(())
    }

    fn is_source_locations_handle(&self, handle: Handle) -> bool {
        self.source_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
    }

    fn is_sink_locations_handle(&self, handle: Handle) -> bool {
        self.sink_audio_locations.as_ref().map_or(false, |locations| locations.handle == handle)
    }
}

impl<T: bt_gatt::ServerTypes> Stream for Server<T> {
    type Item = Result<(), Error>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        loop {
            let mut this = self.as_mut().project();
            let gatt_event = match futures::ready!(this.local_service.as_mut().poll_next(cx)) {
                None => return Poll::Ready(None),
                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
                Some(Ok(event)) => event,
            };
            use bt_gatt::server::ServiceEvent::*;
            match gatt_event {
                Read { handle, offset, responder, .. } => {
                    let offset = offset as usize;
                    let value = match handle {
                        x if x == AVAILABLE_AUDIO_CONTEXTS_HANDLE => {
                            self.available_audio_contexts.into_char_value()
                        }
                        x if x == SUPPORTED_AUDIO_CONTEXTS_HANDLE => {
                            self.supported_audio_contexts.into_char_value()
                        }
                        x if self.is_source_locations_handle(x) => {
                            self.source_audio_locations.as_ref().unwrap().into_char_value()
                        }
                        x if self.is_sink_locations_handle(x) => {
                            self.sink_audio_locations.as_ref().unwrap().into_char_value()
                        }
                        pac_handle => {
                            let Some(ref pac) = self.published_audio_capabilities.get(&pac_handle)
                            else {
                                responder.error(GattError::InvalidHandle);
                                continue;
                            };
                            pac.encode()
                        }
                    };
                    responder.respond(&value[offset..]);
                    continue;
                }
                // TODO(b/309015071): support optional writes.
                Write { responder, .. } => {
                    responder.error(GattError::WriteNotPermitted);
                    continue;
                }
                // TODO(b/309015071): implement notify since it's mandatory.
                ClientConfiguration { .. } => {
                    unimplemented!();
                }
                _ => continue,
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use bt_common::PeerId;
    use bt_common::core::{CodecId, CodingFormat};
    use bt_common::generic_audio::AudioLocation;
    use bt_common::generic_audio::codec_capabilities::*;
    use bt_gatt::server;
    use bt_gatt::test_utils::{FakeServer, FakeServerEvent, FakeTypes};
    use bt_gatt::types::ServiceKind;
    use futures::{FutureExt, StreamExt};

    use std::collections::HashSet;

    use crate::AvailableContexts;

    // Builder for a server with:
    // - 1 sink and 1 source PAC characteristics
    // - sink audio locations
    fn default_server_builder() -> ServerBuilder {
        let builder = ServerBuilder::new()
            .add_sink(vec![PacRecord {
                codec_id: CodecId::Assigned(CodingFormat::ALawLog),
                codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
                    FrameDurationSupport::BothNoPreference,
                )],
                metadata: vec![],
            }])
            .set_sink_locations(AudioLocations {
                locations: HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight]),
            })
            .add_source(vec![
                PacRecord {
                    codec_id: CodecId::Assigned(CodingFormat::ALawLog),
                    codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
                        FrameDurationSupport::BothNoPreference,
                    )],
                    metadata: vec![],
                },
                PacRecord {
                    codec_id: CodecId::Assigned(CodingFormat::MuLawLog),
                    codec_specific_capabilities: vec![CodecCapability::SupportedFrameDurations(
                        FrameDurationSupport::BothNoPreference,
                    )],
                    metadata: vec![],
                },
            ])
            .add_source(vec![]);
        builder
    }

    #[test]
    fn build_server() {
        let server = default_server_builder()
            .build::<FakeTypes>(
                AudioContexts::new(
                    HashSet::from([ContextType::Conversational, ContextType::Media]),
                    HashSet::from([ContextType::Media]),
                ),
                AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
            )
            .expect("should succeed");
        assert_eq!(server.published_audio_capabilities.len(), 2);

        assert_eq!(server.supported_audio_contexts.handle.0, 1);
        assert_eq!(
            server.supported_audio_contexts.sink,
            HashSet::from([ContextType::Conversational, ContextType::Media])
        );
        assert_eq!(server.supported_audio_contexts.source, HashSet::from([ContextType::Media]));

        assert_eq!(server.available_audio_contexts.handle.0, 2);
        assert_eq!(
            server.available_audio_contexts.sink,
            AvailableContexts::Available(HashSet::from([ContextType::Media]))
        );
        assert_eq!(server.available_audio_contexts.source, AvailableContexts::NotAvailable);

        // Should have 1 sink PAC characteristic with audio locations.
        let location_char = server.sink_audio_locations.as_ref().expect("should exist");
        assert_eq!(location_char.handle.0, 3);
        assert_eq!(
            location_char.locations.locations,
            HashSet::from([AudioLocation::FrontLeft, AudioLocation::FrontRight])
        );

        let mut sink_iter =
            server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_sink());
        let sink_char = sink_iter.next().expect("should exist");
        assert_eq!(sink_char.0, &Handle(4));
        assert_eq!(sink_char.1.pac_records().len(), 1);
        assert!(sink_iter.next().is_none());

        // Should have 1 source PAC characteristic w/o audio locations.
        assert!(server.source_audio_locations.is_none());
        let mut source_iter =
            server.published_audio_capabilities.iter().filter(|(_handle, pac)| pac.is_source());
        let source_char = source_iter.next().expect("should exist");
        assert_eq!(source_char.0, &Handle(5));
        assert_eq!(source_char.1.pac_records().len(), 2);
        assert_eq!(source_iter.next(), None);
    }

    #[test]
    fn build_server_error() {
        // No sink or source PACs.
        assert!(
            ServerBuilder::new()
                .build::<FakeTypes>(
                    AudioContexts::new(
                        HashSet::from([ContextType::Conversational, ContextType::Media]),
                        HashSet::from([ContextType::Media]),
                    ),
                    AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
                )
                .is_err()
        );

        // Sink audio context in available not in supported.
        assert!(
            default_server_builder()
                .build::<FakeTypes>(
                    AudioContexts::new(
                        HashSet::from([ContextType::Conversational, ContextType::Media]),
                        HashSet::from([ContextType::Media]),
                    ),
                    AudioContexts::new(HashSet::from([ContextType::Alerts]), HashSet::new()),
                )
                .is_err()
        );

        // Sink audio context in available not in supported.
        assert!(
            default_server_builder()
                .build::<FakeTypes>(
                    AudioContexts::new(
                        HashSet::from([ContextType::Conversational, ContextType::Media]),
                        HashSet::from([ContextType::Media]),
                    ),
                    AudioContexts::new(
                        HashSet::from([]),
                        HashSet::from([ContextType::EmergencyAlarm])
                    ),
                )
                .is_err()
        );
    }

    #[test]
    fn publish_server() {
        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());

        let mut server = default_server_builder()
            .build::<FakeTypes>(
                AudioContexts::new(
                    HashSet::from([ContextType::Media]),
                    HashSet::from([ContextType::Media]),
                ),
                AudioContexts::new(HashSet::new(), HashSet::new()),
            )
            .unwrap();

        // Server should be pending still since GATT server not establihsed.
        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
            panic!("Should be pending");
        };

        let (fake_gatt_server, mut event_receiver) = FakeServer::new();

        // Event stream should be pending still since service not published.
        let mut event_stream = event_receiver.next();
        let Poll::Pending = event_stream.poll_unpin(&mut noop_cx) else {
            panic!("Should be pending");
        };

        let _ = server.publish(fake_gatt_server).expect("should succeed");

        // Server should poll on local server state.
        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
            panic!("Should be pending");
        };

        // Should receive event that GATT service was published.
        let Poll::Ready(Some(FakeServerEvent::Published { id, definition })) =
            event_stream.poll_unpin(&mut noop_cx)
        else {
            panic!("Should be published");
        };
        assert_eq!(id, server::ServiceId::new(1));
        assert_eq!(definition.characteristics().collect::<Vec<_>>().len(), 5);
        assert_eq!(definition.kind(), ServiceKind::Primary);
        assert_eq!(definition.uuid(), crate::PACS_UUID);

        // Server can only be published once.
        let (fake_gatt_server, _) = FakeServer::new();
        assert!(server.publish(fake_gatt_server).is_err());
    }

    #[test]
    fn read_from_server() {
        let mut noop_cx = futures::task::Context::from_waker(futures::task::noop_waker_ref());

        let mut server = default_server_builder()
            .build::<FakeTypes>(
                AudioContexts::new(
                    HashSet::from([ContextType::Media]),
                    HashSet::from([ContextType::Media]),
                ),
                AudioContexts::new(HashSet::from([ContextType::Media]), HashSet::new()),
            )
            .unwrap();

        let (fake_gatt_server, mut event_receiver) = FakeServer::new();
        let _ = server.publish(fake_gatt_server.clone()).expect("should succeed");

        // Server should poll on local server state.
        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
            panic!("Should be pending");
        };

        // Should receive event that GATT service was published.
        let mut event_stream = event_receiver.next();
        let Poll::Ready(Some(FakeServerEvent::Published { id, .. })) =
            event_stream.poll_unpin(&mut noop_cx)
        else {
            panic!("Should be published");
        };

        // Fake an incoming read from a remote peer.
        let available_char_handle = server.available_audio_contexts.handle;
        fake_gatt_server.incoming_read(PeerId(0x01), id, available_char_handle, 0);

        // Server should still be pending.
        let Poll::Pending = server.next().poll_unpin(&mut noop_cx) else {
            panic!("Should be pending");
        };

        // We should received read response.
        let Poll::Ready(Some(FakeServerEvent::ReadResponded { handle, value, .. })) =
            event_stream.poll_unpin(&mut noop_cx)
        else {
            panic!("Should be published");
        };
        assert_eq!(handle, available_char_handle);
        assert_eq!(value.expect("should be ok"), vec![0x04, 0x00, 0x00, 0x00]);
    }
}
