summaryrefslogblamecommitdiffstats
path: root/futures/src/subscription/tracker.rs
blob: c2a0d0f1533fca158bad7f323228530c6fede1b8 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                     
 
                                         
                                                     
 




                                                                             
















                                                            


                                        






                                          



















                                                                              
                                     

                                                           
                           
                           

                                
                         
                                                    





                                                            
                                                         



























                                                                           
                                                         














                                                           
                                           






                                                               







                                                                              
       
                                                             













                                                                      
use crate::{BoxFuture, Subscription};

use futures::{channel::mpsc, sink::Sink};
use std::{collections::HashMap, marker::PhantomData};

/// A registry of subscription streams.
///
/// If you have an application that continuously returns a [`Subscription`],
/// you can use a [`Tracker`] to keep track of the different recipes and keep
/// its executions alive.
#[derive(Debug)]
pub struct Tracker<Hasher, Event> {
    subscriptions: HashMap<u64, Execution<Event>>,
    _hasher: PhantomData<Hasher>,
}

#[derive(Debug)]
pub struct Execution<Event> {
    _cancel: futures::channel::oneshot::Sender<()>,
    listener: Option<futures::channel::mpsc::Sender<Event>>,
}

impl<Hasher, Event> Tracker<Hasher, Event>
where
    Hasher: std::hash::Hasher + Default,
    Event: 'static + Send + Clone,
{
    /// Creates a new empty [`Tracker`].
    ///
    /// [`Tracker`]: struct.Tracker.html
    pub fn new() -> Self {
        Self {
            subscriptions: HashMap::new(),
            _hasher: PhantomData,
        }
    }

    /// Updates the [`Tracker`] with the given [`Subscription`].
    ///
    /// A [`Subscription`] can cause new streams to be spawned or old streams
    /// to be closed.
    ///
    /// The [`Tracker`] keeps track of these streams between calls to this
    /// method:
    ///
    /// - If the provided [`Subscription`] contains a new [`Recipe`] that is
    /// currently not being run, it will spawn a new stream and keep it alive.
    /// - On the other hand, if a [`Recipe`] is currently in execution and the
    /// provided [`Subscription`] does not contain it anymore, then the
    /// [`Tracker`] will close and drop the relevant stream.
    ///
    /// It returns a list of futures that need to be spawned to materialize
    /// the [`Tracker`] changes.
    ///
    /// [`Tracker`]: struct.Tracker.html
    /// [`Subscription`]: struct.Subscription.html
    /// [`Recipe`]: trait.Recipe.html
    pub fn update<Message, Receiver>(
        &mut self,
        subscription: Subscription<Hasher, Event, Message>,
        receiver: Receiver,
    ) -> Vec<BoxFuture<()>>
    where
        Message: 'static + Send,
        Receiver: 'static
            + Sink<Message, Error = mpsc::SendError>
            + Unpin
            + Send
            + Clone,
    {
        use futures::{future::FutureExt, stream::StreamExt};

        let mut futures: Vec<BoxFuture<()>> = Vec::new();

        let recipes = subscription.recipes();
        let mut alive = std::collections::HashSet::new();

        for recipe in recipes {
            let id = {
                let mut hasher = Hasher::default();
                recipe.hash(&mut hasher);

                hasher.finish()
            };

            let _ = alive.insert(id);

            if self.subscriptions.contains_key(&id) {
                continue;
            }

            let (cancel, cancelled) = futures::channel::oneshot::channel();

            // TODO: Use bus if/when it supports async
            let (event_sender, event_receiver) =
                futures::channel::mpsc::channel(100);

            let stream = recipe.stream(event_receiver.boxed());

            let future = futures::future::select(
                cancelled,
                stream.map(Ok).forward(receiver.clone()),
            )
            .map(|_| ());

            let _ = self.subscriptions.insert(
                id,
                Execution {
                    _cancel: cancel,
                    listener: if event_sender.is_closed() {
                        None
                    } else {
                        Some(event_sender)
                    },
                },
            );

            futures.push(Box::pin(future));
        }

        self.subscriptions.retain(|id, _| alive.contains(&id));

        futures
    }

    /// Broadcasts an event to the subscriptions currently alive.
    ///
    /// A subscription's [`Recipe::stream`] always receives a stream of events
    /// as input. This stream can be used by some subscription to listen to
    /// shell events.
    ///
    /// This method publishes the given event to all the subscription streams
    /// currently open.
    ///
    /// [`Recipe::stream`]: trait.Recipe.html#tymethod.stream
    pub fn broadcast(&mut self, event: Event) {
        self.subscriptions
            .values_mut()
            .filter_map(|connection| connection.listener.as_mut())
            .for_each(|listener| {
                if let Err(error) = listener.try_send(event.clone()) {
                    log::error!(
                        "Error sending event to subscription: {:?}",
                        error
                    );
                }
            });
    }
}