diff options
author | 2023-03-05 04:15:10 +0100 | |
---|---|---|
committer | 2023-03-05 04:15:10 +0100 | |
commit | f4cf488e0b083b5d7b7612c650917233163ee9cb (patch) | |
tree | 66b7ebdbf6896f472c927d0f0d9fc02bdc5c5f83 /futures | |
parent | 5fed065dc3aa3d2f9ff8d229cbffe003a89ba033 (diff) | |
download | iced-f4cf488e0b083b5d7b7612c650917233163ee9cb.tar.gz iced-f4cf488e0b083b5d7b7612c650917233163ee9cb.tar.bz2 iced-f4cf488e0b083b5d7b7612c650917233163ee9cb.zip |
Remove generic `Hasher` and `Event` from `subscription::Recipe`
Diffstat (limited to 'futures')
-rw-r--r-- | futures/Cargo.toml | 4 | ||||
-rw-r--r-- | futures/src/backend/native/async_std.rs | 14 | ||||
-rw-r--r-- | futures/src/backend/native/smol.rs | 14 | ||||
-rw-r--r-- | futures/src/backend/native/tokio.rs | 14 | ||||
-rw-r--r-- | futures/src/lib.rs | 1 | ||||
-rw-r--r-- | futures/src/runtime.rs | 25 | ||||
-rw-r--r-- | futures/src/subscription.rs | 316 | ||||
-rw-r--r-- | futures/src/subscription/tracker.rs | 51 |
8 files changed, 314 insertions, 125 deletions
diff --git a/futures/Cargo.toml b/futures/Cargo.toml index e4d355ee..411e7c2a 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -16,6 +16,10 @@ thread-pool = ["futures/thread-pool"] [dependencies] log = "0.4" +[dependencies.iced_core] +version = "0.8" +path = "../core" + [dependencies.futures] version = "0.3" diff --git a/futures/src/backend/native/async_std.rs b/futures/src/backend/native/async_std.rs index b324dbf1..52b0e914 100644 --- a/futures/src/backend/native/async_std.rs +++ b/futures/src/backend/native/async_std.rs @@ -18,28 +18,26 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. + use crate::core::Hasher; use crate::subscription::{self, Subscription}; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every<H: std::hash::Hasher, E>( + pub fn every( duration: std::time::Duration, - ) -> Subscription<H, E, std::time::Instant> { + ) -> Subscription<std::time::Instant> { Subscription::from_recipe(Every(duration)) } #[derive(Debug)] struct Every(std::time::Duration); - impl<H, E> subscription::Recipe<H, E> for Every - where - H: std::hash::Hasher, - { + impl subscription::Recipe for Every { type Output = std::time::Instant; - fn hash(&self, state: &mut H) { + fn hash(&self, state: &mut Hasher) { use std::hash::Hash; std::any::TypeId::of::<Self>().hash(state); @@ -48,7 +46,7 @@ pub mod time { fn stream( self: Box<Self>, - _input: futures::stream::BoxStream<'static, E>, + _input: subscription::EventStream, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::stream::StreamExt; diff --git a/futures/src/backend/native/smol.rs b/futures/src/backend/native/smol.rs index d5201cde..30bc8291 100644 --- a/futures/src/backend/native/smol.rs +++ b/futures/src/backend/native/smol.rs @@ -19,28 +19,26 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. + use crate::core::Hasher; use crate::subscription::{self, Subscription}; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every<H: std::hash::Hasher, E>( + pub fn every( duration: std::time::Duration, - ) -> Subscription<H, E, std::time::Instant> { + ) -> Subscription<std::time::Instant> { Subscription::from_recipe(Every(duration)) } #[derive(Debug)] struct Every(std::time::Duration); - impl<H, E> subscription::Recipe<H, E> for Every - where - H: std::hash::Hasher, - { + impl subscription::Recipe for Every { type Output = std::time::Instant; - fn hash(&self, state: &mut H) { + fn hash(&self, state: &mut Hasher) { use std::hash::Hash; std::any::TypeId::of::<Self>().hash(state); @@ -49,7 +47,7 @@ pub mod time { fn stream( self: Box<Self>, - _input: futures::stream::BoxStream<'static, E>, + _input: subscription::EventStream, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::stream::StreamExt; diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index dd818bd1..4698a105 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -22,28 +22,26 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. + use crate::core::Hasher; use crate::subscription::{self, Subscription}; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every<H: std::hash::Hasher, E>( + pub fn every( duration: std::time::Duration, - ) -> Subscription<H, E, std::time::Instant> { + ) -> Subscription<std::time::Instant> { Subscription::from_recipe(Every(duration)) } #[derive(Debug)] struct Every(std::time::Duration); - impl<H, E> subscription::Recipe<H, E> for Every - where - H: std::hash::Hasher, - { + impl subscription::Recipe for Every { type Output = std::time::Instant; - fn hash(&self, state: &mut H) { + fn hash(&self, state: &mut Hasher) { use std::hash::Hash; std::any::TypeId::of::<Self>().hash(state); @@ -52,7 +50,7 @@ pub mod time { fn stream( self: Box<Self>, - _input: futures::stream::BoxStream<'static, E>, + _input: subscription::EventStream, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::stream::StreamExt; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index c0982db7..39137de2 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -18,6 +18,7 @@ #![allow(clippy::inherent_to_string, clippy::type_complexity)] #![cfg_attr(docsrs, feature(doc_cfg))] pub use futures; +pub use iced_core as core; mod command; mod maybe_send; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index 24f9f241..2241a494 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -1,6 +1,7 @@ //! Run commands and keep track of subscriptions. +use crate::core::event::{self, Event}; use crate::subscription; -use crate::{BoxFuture, Executor, MaybeSend, Subscription}; +use crate::{BoxFuture, Executor, MaybeSend}; use futures::{channel::mpsc, Sink}; use std::marker::PhantomData; @@ -12,18 +13,15 @@ use std::marker::PhantomData; /// /// [`Command`]: crate::Command #[derive(Debug)] -pub struct Runtime<Hasher, Event, Executor, Sender, Message> { +pub struct Runtime<Executor, Sender, Message> { executor: Executor, sender: Sender, - subscriptions: subscription::Tracker<Hasher, Event>, + subscriptions: subscription::Tracker, _message: PhantomData<Message>, } -impl<Hasher, Event, Executor, Sender, Message> - Runtime<Hasher, Event, Executor, Sender, Message> +impl<Executor, Sender, Message> Runtime<Executor, Sender, Message> where - Hasher: std::hash::Hasher + Default, - Event: Send + Clone + 'static, Executor: self::Executor, Sender: Sink<Message, Error = mpsc::SendError> + Unpin @@ -79,7 +77,9 @@ where /// [`Tracker::update`]: subscription::Tracker::update pub fn track( &mut self, - subscription: Subscription<Hasher, Event, Message>, + recipes: impl IntoIterator< + Item = Box<dyn subscription::Recipe<Output = Message>>, + >, ) { let Runtime { executor, @@ -88,8 +88,9 @@ where .. } = self; - let futures = executor - .enter(|| subscriptions.update(subscription, sender.clone())); + let futures = executor.enter(|| { + subscriptions.update(recipes.into_iter(), sender.clone()) + }); for future in futures { executor.spawn(future); @@ -102,7 +103,7 @@ where /// See [`Tracker::broadcast`] to learn more. /// /// [`Tracker::broadcast`]: subscription::Tracker::broadcast - pub fn broadcast(&mut self, event: Event) { - self.subscriptions.broadcast(event); + pub fn broadcast(&mut self, event: Event, status: event::Status) { + self.subscriptions.broadcast(event, status); } } diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index fe53fd7e..876f29c2 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -3,7 +3,18 @@ mod tracker; pub use tracker::Tracker; -use crate::BoxStream; +use crate::core::event::{self, Event}; +use crate::core::window; +use crate::core::Hasher; +use crate::futures::{Future, Stream}; +use crate::{BoxStream, MaybeSend}; + +use std::hash::Hash; + +/// A stream of runtime events. +/// +/// It is the input of a [`Subscription`]. +pub type EventStream = BoxStream<(Event, event::Status)>; /// A request to listen to external events. /// @@ -16,19 +27,13 @@ use crate::BoxStream; /// For instance, you can use a [`Subscription`] to listen to a WebSocket /// connection, keyboard presses, mouse events, time ticks, etc. /// -/// This type is normally aliased by runtimes with a specific `Event` and/or -/// `Hasher`. -/// /// [`Command`]: crate::Command #[must_use = "`Subscription` must be returned to runtime to take effect"] -pub struct Subscription<Hasher, Event, Output> { - recipes: Vec<Box<dyn Recipe<Hasher, Event, Output = Output>>>, +pub struct Subscription<Message> { + recipes: Vec<Box<dyn Recipe<Output = Message>>>, } -impl<H, E, O> Subscription<H, E, O> -where - H: std::hash::Hasher, -{ +impl<Message> Subscription<Message> { /// Returns an empty [`Subscription`] that will not produce any output. pub fn none() -> Self { Self { @@ -38,7 +43,7 @@ where /// Creates a [`Subscription`] from a [`Recipe`] describing it. pub fn from_recipe( - recipe: impl Recipe<H, E, Output = O> + 'static, + recipe: impl Recipe<Output = Message> + 'static, ) -> Self { Self { recipes: vec![Box::new(recipe)], @@ -48,7 +53,7 @@ where /// Batches all the provided subscriptions and returns the resulting /// [`Subscription`]. pub fn batch( - subscriptions: impl IntoIterator<Item = Subscription<H, E, O>>, + subscriptions: impl IntoIterator<Item = Subscription<Message>>, ) -> Self { Self { recipes: subscriptions @@ -59,18 +64,16 @@ where } /// Returns the different recipes of the [`Subscription`]. - pub fn recipes(self) -> Vec<Box<dyn Recipe<H, E, Output = O>>> { + pub fn into_recipes(self) -> Vec<Box<dyn Recipe<Output = Message>>> { self.recipes } /// Adds a value to the [`Subscription`] context. /// /// The value will be part of the identity of a [`Subscription`]. - pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)> + pub fn with<T>(mut self, value: T) -> Subscription<(T, Message)> where - H: 'static, - E: 'static, - O: 'static, + Message: 'static, T: std::hash::Hash + Clone + Send + Sync + 'static, { Subscription { @@ -79,18 +82,16 @@ where .drain(..) .map(|recipe| { Box::new(With::new(recipe, value.clone())) - as Box<dyn Recipe<H, E, Output = (T, O)>> + as Box<dyn Recipe<Output = (T, Message)>> }) .collect(), } } /// Transforms the [`Subscription`] output with the given function. - pub fn map<A>(mut self, f: fn(O) -> A) -> Subscription<H, E, A> + pub fn map<A>(mut self, f: fn(Message) -> A) -> Subscription<A> where - H: 'static, - E: 'static, - O: 'static, + Message: 'static, A: 'static, { Subscription { @@ -98,15 +99,14 @@ where .recipes .drain(..) .map(|recipe| { - Box::new(Map::new(recipe, f)) - as Box<dyn Recipe<H, E, Output = A>> + Box::new(Map::new(recipe, f)) as Box<dyn Recipe<Output = A>> }) .collect(), } } } -impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> { +impl<Message> std::fmt::Debug for Subscription<Message> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Subscription").finish() } @@ -129,7 +129,7 @@ impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> { /// [examples]: https://github.com/iced-rs/iced/tree/0.8/examples /// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.8/examples/download_progress /// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.8/examples/stopwatch -pub trait Recipe<Hasher: std::hash::Hasher, Event> { +pub trait Recipe { /// The events that will be produced by a [`Subscription`] with this /// [`Recipe`]. type Output; @@ -141,45 +141,33 @@ pub trait Recipe<Hasher: std::hash::Hasher, Event> { /// Executes the [`Recipe`] and produces the stream of events of its /// [`Subscription`]. - /// - /// It receives some stream of generic events, which is normally defined by - /// shells. - fn stream( - self: Box<Self>, - input: BoxStream<Event>, - ) -> BoxStream<Self::Output>; + fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>; } -struct Map<Hasher, Event, A, B> { - recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, +struct Map<A, B> { + recipe: Box<dyn Recipe<Output = A>>, mapper: fn(A) -> B, } -impl<H, E, A, B> Map<H, E, A, B> { - fn new( - recipe: Box<dyn Recipe<H, E, Output = A>>, - mapper: fn(A) -> B, - ) -> Self { +impl<A, B> Map<A, B> { + fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: fn(A) -> B) -> Self { Map { recipe, mapper } } } -impl<H, E, A, B> Recipe<H, E> for Map<H, E, A, B> +impl<A, B> Recipe for Map<A, B> where A: 'static, B: 'static, - H: std::hash::Hasher, { type Output = B; - fn hash(&self, state: &mut H) { - use std::hash::Hash; - + fn hash(&self, state: &mut Hasher) { self.recipe.hash(state); self.mapper.hash(state); } - fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> { + fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { use futures::StreamExt; let mapper = self.mapper; @@ -188,34 +176,31 @@ where } } -struct With<Hasher, Event, A, B> { - recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, +struct With<A, B> { + recipe: Box<dyn Recipe<Output = A>>, value: B, } -impl<H, E, A, B> With<H, E, A, B> { - fn new(recipe: Box<dyn Recipe<H, E, Output = A>>, value: B) -> Self { +impl<A, B> With<A, B> { + fn new(recipe: Box<dyn Recipe<Output = A>>, value: B) -> Self { With { recipe, value } } } -impl<H, E, A, B> Recipe<H, E> for With<H, E, A, B> +impl<A, B> Recipe for With<A, B> where A: 'static, B: 'static + std::hash::Hash + Clone + Send + Sync, - H: std::hash::Hasher, { type Output = (B, A); - fn hash(&self, state: &mut H) { - use std::hash::Hash; - + fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::<B>().hash(state); self.value.hash(state); self.recipe.hash(state); } - fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> { + fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { use futures::StreamExt; let value = self.value; @@ -227,3 +212,222 @@ where ) } } + +/// Returns a [`Subscription`] to all the ignored runtime events. +/// +/// This subscription will notify your application of any [`Event`] that was +/// not captured by any widget. +pub fn events() -> Subscription<Event> { + events_with(|event, status| match status { + event::Status::Ignored => Some(event), + event::Status::Captured => None, + }) +} + +/// Returns a [`Subscription`] that filters all the runtime events with the +/// provided function, producing messages accordingly. +/// +/// This subscription will call the provided function for every [`Event`] +/// handled by the runtime. If the function: +/// +/// - Returns `None`, the [`Event`] will be discarded. +/// - Returns `Some` message, the `Message` will be produced. +pub fn events_with<Message>( + f: fn(Event, event::Status) -> Option<Message>, +) -> Subscription<Message> +where + Message: 'static + MaybeSend, +{ + #[derive(Hash)] + struct EventsWith; + + Subscription::from_recipe(Runner { + id: (EventsWith, f), + spawn: move |events| { + use futures::future; + use futures::stream::StreamExt; + + events.filter_map(move |(event, status)| { + future::ready(match event { + Event::Window(window::Event::RedrawRequested(_)) => None, + _ => f(event, status), + }) + }) + }, + }) +} + +/// Returns a [`Subscription`] that produces a message for every runtime event, +/// including the redraw request events. +/// +/// **Warning:** This [`Subscription`], if unfiltered, may produce messages in +/// an infinite loop. +pub fn raw_events<Message>( + f: fn(Event, event::Status) -> Option<Message>, +) -> Subscription<Message> +where + Message: 'static + MaybeSend, +{ + #[derive(Hash)] + struct RawEvents; + + Subscription::from_recipe(Runner { + id: (RawEvents, f), + spawn: move |events| { + use futures::future; + use futures::stream::StreamExt; + + events.filter_map(move |(event, status)| { + future::ready(f(event, status)) + }) + }, + }) +} + +/// Returns a [`Subscription`] that will call the given function to create and +/// asynchronously run the given [`Stream`]. +pub fn run<S, Message>(builder: fn() -> S) -> Subscription<Message> +where + S: Stream<Item = Message> + MaybeSend + 'static, + Message: 'static, +{ + Subscription::from_recipe(Runner { + id: builder, + spawn: move |_| builder(), + }) +} + +/// Returns a [`Subscription`] that will create and asynchronously run the +/// given [`Stream`]. +/// +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn run_with_id<I, S, Message>(id: I, stream: S) -> Subscription<Message> +where + I: Hash + 'static, + S: Stream<Item = Message> + MaybeSend + 'static, + Message: 'static, +{ + Subscription::from_recipe(Runner { + id, + spawn: move |_| stream, + }) +} + +/// Returns a [`Subscription`] that will create and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// The `id` will be used to uniquely identify the [`Subscription`]. +/// +/// # Creating an asynchronous worker with bidirectional communication +/// You can leverage this helper to create a [`Subscription`] that spawns +/// an asynchronous worker in the background and establish a channel of +/// communication with an `iced` application. +/// +/// You can achieve this by creating an `mpsc` channel inside the closure +/// and returning the `Sender` as a `Message` for the `Application`: +/// +/// ``` +/// use iced_futures::subscription::{self, Subscription}; +/// use iced_futures::futures; +/// +/// use futures::channel::mpsc; +/// +/// pub enum Event { +/// Ready(mpsc::Sender<Input>), +/// WorkFinished, +/// // ... +/// } +/// +/// enum Input { +/// DoSomeWork, +/// // ... +/// } +/// +/// enum State { +/// Starting, +/// Ready(mpsc::Receiver<Input>), +/// } +/// +/// fn some_worker() -> Subscription<Event> { +/// struct SomeWorker; +/// +/// subscription::unfold(std::any::TypeId::of::<SomeWorker>(), State::Starting, |state| async move { +/// match state { +/// State::Starting => { +/// // Create channel +/// let (sender, receiver) = mpsc::channel(100); +/// +/// (Some(Event::Ready(sender)), State::Ready(receiver)) +/// } +/// State::Ready(mut receiver) => { +/// use futures::StreamExt; +/// +/// // Read next input sent from `Application` +/// let input = receiver.select_next_some().await; +/// +/// match input { +/// Input::DoSomeWork => { +/// // Do some async work... +/// +/// // Finally, we can optionally return a message to tell the +/// // `Application` the work is done +/// (Some(Event::WorkFinished), State::Ready(receiver)) +/// } +/// } +/// } +/// } +/// }) +/// } +/// ``` +/// +/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket +/// connection open. +/// +/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.8/examples/websocket +pub fn unfold<I, T, Fut, Message>( + id: I, + initial: T, + mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, +) -> Subscription<Message> +where + I: Hash + 'static, + T: MaybeSend + 'static, + Fut: Future<Output = (Option<Message>, T)> + MaybeSend + 'static, + Message: 'static + MaybeSend, +{ + use futures::future::{self, FutureExt}; + use futures::stream::StreamExt; + + run_with_id( + id, + futures::stream::unfold(initial, move |state| f(state).map(Some)) + .filter_map(future::ready), + ) +} + +struct Runner<I, F, S, Message> +where + F: FnOnce(EventStream) -> S, + S: Stream<Item = Message>, +{ + id: I, + spawn: F, +} + +impl<I, S, F, Message> Recipe for Runner<I, F, S, Message> +where + I: Hash + 'static, + F: FnOnce(EventStream) -> S, + S: Stream<Item = Message> + MaybeSend + 'static, +{ + type Output = Message; + + fn hash(&self, state: &mut Hasher) { + std::any::TypeId::of::<I>().hash(state); + self.id.hash(state); + } + + fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> { + crate::boxed_stream((self.spawn)(input)) + } +} diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index 9fe110b0..ae71cd25 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -1,38 +1,35 @@ -use crate::{BoxFuture, MaybeSend, Subscription}; +use crate::core::event::{self, Event}; +use crate::core::Hasher; +use crate::subscription::Recipe; +use crate::{BoxFuture, MaybeSend}; -use futures::{ - channel::mpsc, - sink::{Sink, SinkExt}, -}; -use std::{collections::HashMap, marker::PhantomData}; +use futures::channel::mpsc; +use futures::sink::{Sink, SinkExt}; + +use std::collections::HashMap; +use std::hash::Hasher as _; /// A registry of subscription streams. /// /// If you have an application that continuously returns a [`Subscription`], /// you can use a [`Tracker`] to keep track of the different recipes and keep /// its executions alive. -#[derive(Debug)] -pub struct Tracker<Hasher, Event> { - subscriptions: HashMap<u64, Execution<Event>>, - _hasher: PhantomData<Hasher>, +#[derive(Debug, Default)] +pub struct Tracker { + subscriptions: HashMap<u64, Execution>, } #[derive(Debug)] -pub struct Execution<Event> { +pub struct Execution { _cancel: futures::channel::oneshot::Sender<()>, - listener: Option<futures::channel::mpsc::Sender<Event>>, + listener: Option<futures::channel::mpsc::Sender<(Event, event::Status)>>, } -impl<Hasher, Event> Tracker<Hasher, Event> -where - Hasher: std::hash::Hasher + Default, - Event: 'static + Send + Clone, -{ +impl Tracker { /// Creates a new empty [`Tracker`]. pub fn new() -> Self { Self { subscriptions: HashMap::new(), - _hasher: PhantomData, } } @@ -56,7 +53,7 @@ where /// [`Recipe`]: crate::subscription::Recipe pub fn update<Message, Receiver>( &mut self, - subscription: Subscription<Hasher, Event, Message>, + recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>, receiver: Receiver, ) -> Vec<BoxFuture<()>> where @@ -70,8 +67,6 @@ where use futures::stream::StreamExt; let mut futures: Vec<BoxFuture<()>> = Vec::new(); - - let recipes = subscription.recipes(); let mut alive = std::collections::HashSet::new(); for recipe in recipes { @@ -142,12 +137,12 @@ where /// currently open. /// /// [`Recipe::stream`]: crate::subscription::Recipe::stream - pub fn broadcast(&mut self, event: Event) { + pub fn broadcast(&mut self, event: Event, status: event::Status) { self.subscriptions .values_mut() .filter_map(|connection| connection.listener.as_mut()) .for_each(|listener| { - if let Err(error) = listener.try_send(event.clone()) { + if let Err(error) = listener.try_send((event.clone(), status)) { log::warn!( "Error sending event to subscription: {:?}", error @@ -156,13 +151,3 @@ where }); } } - -impl<Hasher, Event> Default for Tracker<Hasher, Event> -where - Hasher: std::hash::Hasher + Default, - Event: 'static + Send + Clone, -{ - fn default() -> Self { - Self::new() - } -} |