use crate::core::event::{self, Event};
use crate::core::Hasher;
use crate::subscription::Recipe;
use crate::{BoxFuture, MaybeSend};
use futures::channel::mpsc;
use futures::sink::{Sink, SinkExt};
use std::collections::HashMap;
use std::hash::Hasher as _;
/// A registry of subscription streams.
///
/// If you have an application that continuously returns a [`Subscription`],
/// you can use a [`Tracker`] to keep track of the different recipes and keep
/// its executions alive.
///
/// [`Subscription`]: crate::Subscription
#[derive(Debug, Default)]
pub struct Tracker {
    subscriptions: HashMap<u64, Execution>,
}
#[derive(Debug)]
pub struct Execution {
    _cancel: futures::channel::oneshot::Sender<()>,
    listener: Option<futures::channel::mpsc::Sender<(Event, event::Status)>>,
}
impl Tracker {
    /// Creates a new empty [`Tracker`].
    pub fn new() -> Self {
        Self {
            subscriptions: HashMap::new(),
        }
    }
    /// Updates the [`Tracker`] with the given [`Subscription`].
    ///
    /// A [`Subscription`] can cause new streams to be spawned or old streams
    /// to be closed.
    ///
    /// The [`Tracker`] keeps track of these streams between calls to this
    /// method:
    ///
    /// - If the provided [`Subscription`] contains a new [`Recipe`] that is
    /// currently not being run, it will spawn a new stream and keep it alive.
    /// - On the other hand, if a [`Recipe`] is currently in execution and the
    /// provided [`Subscription`] does not contain it anymore, then the
    /// [`Tracker`] will close and drop the relevant stream.
    ///
    /// It returns a list of futures that need to be spawned to materialize
    /// the [`Tracker`] changes.
    ///
    /// [`Recipe`]: crate::subscription::Recipe
    /// [`Subscription`]: crate::Subscription
    pub fn update<Message, Receiver>(
        &mut self,
        recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,
        receiver: Receiver,
    ) -> Vec<BoxFuture<()>>
    where
        Message: 'static + MaybeSend,
        Receiver: 'static
            + Sink<Message, Error = mpsc::SendError>
            + Unpin
            + MaybeSend
            + Clone,
    {
        use futures::stream::StreamExt;
        let mut futures: Vec<BoxFuture<()>> = Vec::new();
        let mut alive = std::collections::HashSet::new();
        for recipe in recipes {
            let id = {
                let mut hasher = Hasher::default();
                recipe.hash(&mut hasher);
                hasher.finish()
            };
            let _ = alive.insert(id);
            if self.subscriptions.contains_key(&id) {
                continue;
            }
            let (cancel, mut canceled) = futures::channel::oneshot::channel();
            // TODO: Use bus if/when it supports async
            let (event_sender, event_receiver) =
                futures::channel::mpsc::channel(100);
            let mut receiver = receiver.clone();
            let mut stream = recipe.stream(event_receiver.boxed());
            let future = async move {
                loop {
                    let select =
                        futures::future::select(&mut canceled, stream.next());
                    match select.await {
                        futures::future::Either::Left(_)
                        | futures::future::Either::Right((None, _)) => break,
                        futures::future::Either::Right((Some(message), _)) => {
                            let _ = receiver.send(message).await;
                        }
                    }
                }
            };
            let _ = self.subscriptions.insert(
                id,
                Execution {
                    _cancel: cancel,
                    listener: if event_sender.is_closed() {
                        None
                    } else {
                        Some(event_sender)
                    },
                },
            );
            futures.push(Box::pin(future));
        }
        self.subscriptions.retain(|id, _| alive.contains(id));
        futures
    }
    /// Broadcasts an event to the subscriptions currently alive.
    ///
    /// A subscription's [`Recipe::stream`] always receives a stream of events
    /// as input. This stream can be used by some subscription to listen to
    /// shell events.
    ///
    /// This method publishes the given event to all the subscription streams
    /// currently open.
    ///
    /// [`Recipe::stream`]: crate::subscription::Recipe::stream
    pub fn broadcast(&mut self, event: Event, status: event::Status) {
        self.subscriptions
            .values_mut()
            .filter_map(|connection| connection.listener.as_mut())
            .for_each(|listener| {
                if let Err(error) = listener.try_send((event.clone(), status)) {
                    log::warn!(
                        "Error sending event to subscription: {error:?}"
                    );
                }
            });
    }
}