diff options
Diffstat (limited to 'futures/src/subscription/tracker.rs')
-rw-r--r-- | futures/src/subscription/tracker.rs | 51 |
1 files changed, 18 insertions, 33 deletions
diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index 9fe110b0..ae71cd25 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -1,38 +1,35 @@ -use crate::{BoxFuture, MaybeSend, Subscription}; +use crate::core::event::{self, Event}; +use crate::core::Hasher; +use crate::subscription::Recipe; +use crate::{BoxFuture, MaybeSend}; -use futures::{ - channel::mpsc, - sink::{Sink, SinkExt}, -}; -use std::{collections::HashMap, marker::PhantomData}; +use futures::channel::mpsc; +use futures::sink::{Sink, SinkExt}; + +use std::collections::HashMap; +use std::hash::Hasher as _; /// A registry of subscription streams. /// /// If you have an application that continuously returns a [`Subscription`], /// you can use a [`Tracker`] to keep track of the different recipes and keep /// its executions alive. -#[derive(Debug)] -pub struct Tracker<Hasher, Event> { - subscriptions: HashMap<u64, Execution<Event>>, - _hasher: PhantomData<Hasher>, +#[derive(Debug, Default)] +pub struct Tracker { + subscriptions: HashMap<u64, Execution>, } #[derive(Debug)] -pub struct Execution<Event> { +pub struct Execution { _cancel: futures::channel::oneshot::Sender<()>, - listener: Option<futures::channel::mpsc::Sender<Event>>, + listener: Option<futures::channel::mpsc::Sender<(Event, event::Status)>>, } -impl<Hasher, Event> Tracker<Hasher, Event> -where - Hasher: std::hash::Hasher + Default, - Event: 'static + Send + Clone, -{ +impl Tracker { /// Creates a new empty [`Tracker`]. pub fn new() -> Self { Self { subscriptions: HashMap::new(), - _hasher: PhantomData, } } @@ -56,7 +53,7 @@ where /// [`Recipe`]: crate::subscription::Recipe pub fn update<Message, Receiver>( &mut self, - subscription: Subscription<Hasher, Event, Message>, + recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>, receiver: Receiver, ) -> Vec<BoxFuture<()>> where @@ -70,8 +67,6 @@ where use futures::stream::StreamExt; let mut futures: Vec<BoxFuture<()>> = Vec::new(); - - let recipes = subscription.recipes(); let mut alive = std::collections::HashSet::new(); for recipe in recipes { @@ -142,12 +137,12 @@ where /// currently open. /// /// [`Recipe::stream`]: crate::subscription::Recipe::stream - pub fn broadcast(&mut self, event: Event) { + pub fn broadcast(&mut self, event: Event, status: event::Status) { self.subscriptions .values_mut() .filter_map(|connection| connection.listener.as_mut()) .for_each(|listener| { - if let Err(error) = listener.try_send(event.clone()) { + if let Err(error) = listener.try_send((event.clone(), status)) { log::warn!( "Error sending event to subscription: {:?}", error @@ -156,13 +151,3 @@ where }); } } - -impl<Hasher, Event> Default for Tracker<Hasher, Event> -where - Hasher: std::hash::Hasher + Default, - Event: 'static + Send + Clone, -{ - fn default() -> Self { - Self::new() - } -} |