diff options
Diffstat (limited to 'futures/src/subscription')
| -rw-r--r-- | futures/src/subscription/tracker.rs | 51 | 
1 files changed, 18 insertions, 33 deletions
| diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index 9fe110b0..ae71cd25 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -1,38 +1,35 @@ -use crate::{BoxFuture, MaybeSend, Subscription}; +use crate::core::event::{self, Event}; +use crate::core::Hasher; +use crate::subscription::Recipe; +use crate::{BoxFuture, MaybeSend}; -use futures::{ -    channel::mpsc, -    sink::{Sink, SinkExt}, -}; -use std::{collections::HashMap, marker::PhantomData}; +use futures::channel::mpsc; +use futures::sink::{Sink, SinkExt}; + +use std::collections::HashMap; +use std::hash::Hasher as _;  /// A registry of subscription streams.  ///  /// If you have an application that continuously returns a [`Subscription`],  /// you can use a [`Tracker`] to keep track of the different recipes and keep  /// its executions alive. -#[derive(Debug)] -pub struct Tracker<Hasher, Event> { -    subscriptions: HashMap<u64, Execution<Event>>, -    _hasher: PhantomData<Hasher>, +#[derive(Debug, Default)] +pub struct Tracker { +    subscriptions: HashMap<u64, Execution>,  }  #[derive(Debug)] -pub struct Execution<Event> { +pub struct Execution {      _cancel: futures::channel::oneshot::Sender<()>, -    listener: Option<futures::channel::mpsc::Sender<Event>>, +    listener: Option<futures::channel::mpsc::Sender<(Event, event::Status)>>,  } -impl<Hasher, Event> Tracker<Hasher, Event> -where -    Hasher: std::hash::Hasher + Default, -    Event: 'static + Send + Clone, -{ +impl Tracker {      /// Creates a new empty [`Tracker`].      pub fn new() -> Self {          Self {              subscriptions: HashMap::new(), -            _hasher: PhantomData,          }      } @@ -56,7 +53,7 @@ where      /// [`Recipe`]: crate::subscription::Recipe      pub fn update<Message, Receiver>(          &mut self, -        subscription: Subscription<Hasher, Event, Message>, +        recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,          receiver: Receiver,      ) -> Vec<BoxFuture<()>>      where @@ -70,8 +67,6 @@ where          use futures::stream::StreamExt;          let mut futures: Vec<BoxFuture<()>> = Vec::new(); - -        let recipes = subscription.recipes();          let mut alive = std::collections::HashSet::new();          for recipe in recipes { @@ -142,12 +137,12 @@ where      /// currently open.      ///      /// [`Recipe::stream`]: crate::subscription::Recipe::stream -    pub fn broadcast(&mut self, event: Event) { +    pub fn broadcast(&mut self, event: Event, status: event::Status) {          self.subscriptions              .values_mut()              .filter_map(|connection| connection.listener.as_mut())              .for_each(|listener| { -                if let Err(error) = listener.try_send(event.clone()) { +                if let Err(error) = listener.try_send((event.clone(), status)) {                      log::warn!(                          "Error sending event to subscription: {:?}",                          error @@ -156,13 +151,3 @@ where              });      }  } - -impl<Hasher, Event> Default for Tracker<Hasher, Event> -where -    Hasher: std::hash::Hasher + Default, -    Event: 'static + Send + Clone, -{ -    fn default() -> Self { -        Self::new() -    } -} | 
