use crate::core::event::{self, Event}; use crate::core::Hasher; use crate::subscription::Recipe; use crate::{BoxFuture, MaybeSend}; use futures::channel::mpsc; use futures::sink::{Sink, SinkExt}; use std::collections::HashMap; use std::hash::Hasher as _; /// A registry of subscription streams. /// /// If you have an application that continuously returns a [`Subscription`], /// you can use a [`Tracker`] to keep track of the different recipes and keep /// its executions alive. /// /// [`Subscription`]: crate::Subscription #[derive(Debug, Default)] pub struct Tracker { subscriptions: HashMap, } #[derive(Debug)] pub struct Execution { _cancel: futures::channel::oneshot::Sender<()>, listener: Option>, } impl Tracker { /// Creates a new empty [`Tracker`]. pub fn new() -> Self { Self { subscriptions: HashMap::new(), } } /// Updates the [`Tracker`] with the given [`Subscription`]. /// /// A [`Subscription`] can cause new streams to be spawned or old streams /// to be closed. /// /// The [`Tracker`] keeps track of these streams between calls to this /// method: /// /// - If the provided [`Subscription`] contains a new [`Recipe`] that is /// currently not being run, it will spawn a new stream and keep it alive. /// - On the other hand, if a [`Recipe`] is currently in execution and the /// provided [`Subscription`] does not contain it anymore, then the /// [`Tracker`] will close and drop the relevant stream. /// /// It returns a list of futures that need to be spawned to materialize /// the [`Tracker`] changes. /// /// [`Recipe`]: crate::subscription::Recipe /// [`Subscription`]: crate::Subscription pub fn update( &mut self, recipes: impl Iterator>>, receiver: Receiver, ) -> Vec> where Message: 'static + MaybeSend, Receiver: 'static + Sink + Unpin + MaybeSend + Clone, { use futures::stream::StreamExt; let mut futures: Vec> = Vec::new(); let mut alive = std::collections::HashSet::new(); for recipe in recipes { let id = { let mut hasher = Hasher::default(); recipe.hash(&mut hasher); hasher.finish() }; let _ = alive.insert(id); if self.subscriptions.contains_key(&id) { continue; } let (cancel, mut canceled) = futures::channel::oneshot::channel(); // TODO: Use bus if/when it supports async let (event_sender, event_receiver) = futures::channel::mpsc::channel(100); let mut receiver = receiver.clone(); let mut stream = recipe.stream(event_receiver.boxed()); let future = async move { loop { let select = futures::future::select(&mut canceled, stream.next()); match select.await { futures::future::Either::Left(_) | futures::future::Either::Right((None, _)) => break, futures::future::Either::Right((Some(message), _)) => { let _ = receiver.send(message).await; } } } }; let _ = self.subscriptions.insert( id, Execution { _cancel: cancel, listener: if event_sender.is_closed() { None } else { Some(event_sender) }, }, ); futures.push(Box::pin(future)); } self.subscriptions.retain(|id, _| alive.contains(id)); futures } /// Broadcasts an event to the subscriptions currently alive. /// /// A subscription's [`Recipe::stream`] always receives a stream of events /// as input. This stream can be used by some subscription to listen to /// shell events. /// /// This method publishes the given event to all the subscription streams /// currently open. /// /// [`Recipe::stream`]: crate::subscription::Recipe::stream pub fn broadcast(&mut self, event: Event, status: event::Status) { self.subscriptions .values_mut() .filter_map(|connection| connection.listener.as_mut()) .for_each(|listener| { if let Err(error) = listener.try_send((event.clone(), status)) { log::warn!( "Error sending event to subscription: {:?}", error ); } }); } }