summaryrefslogblamecommitdiffstats
path: root/futures/src/subscription/tracker.rs
blob: a942b6199dd587a6081ec9429523c3c9b7247d10 (plain) (tree)





























                                                            
                                     

                                                           
                           


                                    
                         



































                                                                           
                                                         




































                                                                      
use crate::Subscription;

use futures::{future::BoxFuture, sink::Sink};
use std::collections::HashMap;
use std::marker::PhantomData;

#[derive(Debug)]
pub struct Tracker<Hasher, Event> {
    subscriptions: HashMap<u64, Execution<Event>>,
    _hasher: PhantomData<Hasher>,
}

#[derive(Debug)]
pub struct Execution<Event> {
    _cancel: futures::channel::oneshot::Sender<()>,
    listener: Option<futures::channel::mpsc::Sender<Event>>,
}

impl<Hasher, Event> Tracker<Hasher, Event>
where
    Hasher: std::hash::Hasher + Default,
    Event: 'static + Send + Clone,
{
    pub fn new() -> Self {
        Self {
            subscriptions: HashMap::new(),
            _hasher: PhantomData,
        }
    }

    pub fn update<Message, Receiver>(
        &mut self,
        subscription: Subscription<Hasher, Event, Message>,
        receiver: Receiver,
    ) -> Vec<BoxFuture<'static, ()>>
    where
        Message: 'static + Send,
        Receiver: 'static
            + Sink<Message, Error = core::convert::Infallible>
            + Unpin
            + Send
            + Clone,
    {
        use futures::{future::FutureExt, stream::StreamExt};

        let mut futures = Vec::new();

        let recipes = subscription.recipes();
        let mut alive = std::collections::HashSet::new();

        for recipe in recipes {
            let id = {
                let mut hasher = Hasher::default();
                recipe.hash(&mut hasher);

                hasher.finish()
            };

            let _ = alive.insert(id);

            if self.subscriptions.contains_key(&id) {
                continue;
            }

            let (cancel, cancelled) = futures::channel::oneshot::channel();

            // TODO: Use bus if/when it supports async
            let (event_sender, event_receiver) =
                futures::channel::mpsc::channel(100);

            let stream = recipe.stream(event_receiver.boxed());

            let future = futures::future::select(
                cancelled,
                stream.map(Ok).forward(receiver.clone()),
            )
            .map(|_| ());

            let _ = self.subscriptions.insert(
                id,
                Execution {
                    _cancel: cancel,
                    listener: if event_sender.is_closed() {
                        None
                    } else {
                        Some(event_sender)
                    },
                },
            );

            futures.push(future.boxed());
        }

        self.subscriptions.retain(|id, _| alive.contains(&id));

        futures
    }

    pub fn broadcast(&mut self, event: Event) {
        self.subscriptions
            .values_mut()
            .filter_map(|connection| connection.listener.as_mut())
            .for_each(|listener| {
                if let Err(error) = listener.try_send(event.clone()) {
                    log::error!(
                        "Error sending event to subscription: {:?}",
                        error
                    );
                }
            });
    }
}