use crate::Subscription; use futures::{future::BoxFuture, sink::Sink}; use std::collections::HashMap; use std::marker::PhantomData; #[derive(Debug)] pub struct Tracker { subscriptions: HashMap>, _hasher: PhantomData, } #[derive(Debug)] pub struct Execution { _cancel: futures::channel::oneshot::Sender<()>, listener: Option>, } impl Tracker where Hasher: std::hash::Hasher + Default, Event: 'static + Send + Clone, { pub fn new() -> Self { Self { subscriptions: HashMap::new(), _hasher: PhantomData, } } pub fn update( &mut self, subscription: Subscription, receiver: Receiver, ) -> Vec> where Message: 'static + Send, Receiver: 'static + Sink + Unpin + Send + Clone, { use futures::{future::FutureExt, stream::StreamExt}; let mut futures = Vec::new(); let recipes = subscription.recipes(); let mut alive = std::collections::HashSet::new(); for recipe in recipes { let id = { let mut hasher = Hasher::default(); recipe.hash(&mut hasher); hasher.finish() }; let _ = alive.insert(id); if self.subscriptions.contains_key(&id) { continue; } let (cancel, cancelled) = futures::channel::oneshot::channel(); // TODO: Use bus if/when it supports async let (event_sender, event_receiver) = futures::channel::mpsc::channel(100); let stream = recipe.stream(event_receiver.boxed()); let future = futures::future::select( cancelled, stream.map(Ok).forward(receiver.clone()), ) .map(|_| ()); let _ = self.subscriptions.insert( id, Execution { _cancel: cancel, listener: if event_sender.is_closed() { None } else { Some(event_sender) }, }, ); futures.push(future.boxed()); } self.subscriptions.retain(|id, _| alive.contains(&id)); futures } pub fn broadcast(&mut self, event: Event) { self.subscriptions .values_mut() .filter_map(|connection| connection.listener.as_mut()) .for_each(|listener| { if let Err(error) = listener.try_send(event.clone()) { log::error!( "Error sending event to subscription: {:?}", error ); } }); } }