use crate::{BoxFuture, Subscription}; use futures::{channel::mpsc, sink::Sink}; use std::{collections::HashMap, marker::PhantomData}; /// A registry of subscription streams. /// /// If you have an application that continuously returns a [`Subscription`], /// you can use a [`Tracker`] to keep track of the different recipes and keep /// its executions alive. #[derive(Debug)] pub struct Tracker { subscriptions: HashMap>, _hasher: PhantomData, } #[derive(Debug)] pub struct Execution { _cancel: futures::channel::oneshot::Sender<()>, listener: Option>, } impl Tracker where Hasher: std::hash::Hasher + Default, Event: 'static + Send + Clone, { /// Creates a new empty [`Tracker`]. /// /// [`Tracker`]: struct.Tracker.html pub fn new() -> Self { Self { subscriptions: HashMap::new(), _hasher: PhantomData, } } /// Updates the [`Tracker`] with the given [`Subscription`]. /// /// A [`Subscription`] can cause new streams to be spawned or old streams /// to be closed. /// /// The [`Tracker`] keeps track of these streams between calls to this /// method: /// /// - If the provided [`Subscription`] contains a new [`Recipe`] that is /// currently not being run, it will spawn a new stream and keep it alive. /// - On the other hand, if a [`Recipe`] is currently in execution and the /// provided [`Subscription`] does not contain it anymore, then the /// [`Tracker`] will close and drop the relevant stream. /// /// It returns a list of futures that need to be spawned to materialize /// the [`Tracker`] changes. /// /// [`Tracker`]: struct.Tracker.html /// [`Subscription`]: struct.Subscription.html /// [`Recipe`]: trait.Recipe.html pub fn update( &mut self, subscription: Subscription, receiver: Receiver, ) -> Vec> where Message: 'static + Send, Receiver: 'static + Sink + Unpin + Send + Clone, { use futures::{future::FutureExt, stream::StreamExt}; let mut futures: Vec> = Vec::new(); let recipes = subscription.recipes(); let mut alive = std::collections::HashSet::new(); for recipe in recipes { let id = { let mut hasher = Hasher::default(); recipe.hash(&mut hasher); hasher.finish() }; let _ = alive.insert(id); if self.subscriptions.contains_key(&id) { continue; } let (cancel, cancelled) = futures::channel::oneshot::channel(); // TODO: Use bus if/when it supports async let (event_sender, event_receiver) = futures::channel::mpsc::channel(100); let stream = recipe.stream(event_receiver.boxed()); let future = futures::future::select( cancelled, stream.map(Ok).forward(receiver.clone()), ) .map(|_| ()); let _ = self.subscriptions.insert( id, Execution { _cancel: cancel, listener: if event_sender.is_closed() { None } else { Some(event_sender) }, }, ); futures.push(Box::pin(future)); } self.subscriptions.retain(|id, _| alive.contains(&id)); futures } /// Broadcasts an event to the subscriptions currently alive. /// /// A subscription's [`Recipe::stream`] always receives a stream of events /// as input. This stream can be used by some subscription to listen to /// shell events. /// /// This method publishes the given event to all the subscription streams /// currently open. /// /// [`Recipe::stream`]: trait.Recipe.html#tymethod.stream pub fn broadcast(&mut self, event: Event) { self.subscriptions .values_mut() .filter_map(|connection| connection.listener.as_mut()) .for_each(|listener| { if let Err(error) = listener.try_send(event.clone()) { log::error!( "Error sending event to subscription: {:?}", error ); } }); } }