From 32f7ca261f0655938ae7c8919599b020ddea8ff8 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 08:36:44 +0100 Subject: Implement `subscription::Tracker` in `iced_core` --- core/Cargo.toml | 3 +- core/src/lib.rs | 2 +- core/src/subscription.rs | 11 +++- core/src/subscription/tracker.rs | 112 ++++++++++++++++++++++++++++++++++++++ examples/stopwatch.rs | 2 +- native/src/subscription.rs | 7 ++- native/src/subscription/events.rs | 2 +- 7 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 core/src/subscription/tracker.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 0a8fd8ef..4e019ba9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -11,7 +11,8 @@ repository = "https://github.com/hecrj/iced" # Exposes a future-based `Command` type command = ["futures"] # Exposes a future-based `Subscription` type -subscription = ["futures"] +subscription = ["futures", "log"] [dependencies] futures = { version = "0.3", optional = true } +log = { version = "0.4", optional = true } diff --git a/core/src/lib.rs b/core/src/lib.rs index 821b09c1..6f13c310 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,7 +9,7 @@ //! [Iced]: https://github.com/hecrj/iced //! [`iced_native`]: https://github.com/hecrj/iced/tree/master/native //! [`iced_web`]: https://github.com/hecrj/iced/tree/master/web -#![deny(missing_docs)] +//#![deny(missing_docs)] #![deny(missing_debug_implementations)] #![deny(unused_results)] #![deny(unsafe_code)] diff --git a/core/src/subscription.rs b/core/src/subscription.rs index d9e7e388..87e51e48 100644 --- a/core/src/subscription.rs +++ b/core/src/subscription.rs @@ -1,4 +1,9 @@ //! Listen to external events in your application. +mod tracker; + +pub use tracker::Tracker; + +use futures::stream::BoxStream; /// A request to listen to external events. /// @@ -134,8 +139,8 @@ pub trait Recipe { /// [`Recipe`]: trait.Recipe.html fn stream( self: Box, - input: Input, - ) -> futures::stream::BoxStream<'static, Self::Output>; + input: BoxStream<'static, Input>, + ) -> BoxStream<'static, Self::Output>; } struct Map { @@ -169,7 +174,7 @@ where fn stream( self: Box, - input: I, + input: BoxStream<'static, I>, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::StreamExt; diff --git a/core/src/subscription/tracker.rs b/core/src/subscription/tracker.rs new file mode 100644 index 00000000..826f60c0 --- /dev/null +++ b/core/src/subscription/tracker.rs @@ -0,0 +1,112 @@ +use crate::Subscription; + +use futures::{future::BoxFuture, sink::Sink}; +use std::collections::HashMap; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Tracker { + subscriptions: HashMap>, + _hasher: PhantomData, +} + +#[derive(Debug)] +pub struct Execution { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option>, +} + +impl Tracker +where + Hasher: std::hash::Hasher + Default, + Event: 'static + Send + Clone, +{ + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + _hasher: PhantomData, + } + } + + pub fn update( + &mut self, + subscription: Subscription, + sink: S, + ) -> Vec> + where + Message: 'static + Send, + S: 'static + + Sink + + Unpin + + Send + + Clone, + { + use futures::{future::FutureExt, stream::StreamExt}; + + let mut futures = Vec::new(); + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if self.subscriptions.contains_key(&id) { + continue; + } + + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + + let future = futures::future::select( + cancelled, + stream.map(Ok).forward(sink.clone()), + ) + .map(|_| ()); + + let _ = self.subscriptions.insert( + id, + Execution { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + + futures.push(future.boxed()); + } + + self.subscriptions.retain(|id, _| alive.contains(&id)); + + futures + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event.clone()) { + log::error!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} diff --git a/examples/stopwatch.rs b/examples/stopwatch.rs index c9a61ee9..2bc85c4d 100644 --- a/examples/stopwatch.rs +++ b/examples/stopwatch.rs @@ -165,7 +165,7 @@ mod time { fn stream( self: Box, - _input: I, + _input: futures::stream::BoxStream<'static, I>, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::stream::StreamExt; diff --git a/native/src/subscription.rs b/native/src/subscription.rs index db88867a..cd0822c1 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -15,7 +15,7 @@ use futures::stream::BoxStream; /// /// [`Command`]: ../struct.Command.html /// [`Subscription`]: struct.Subscription.html -pub type Subscription = iced_core::Subscription; +pub type Subscription = iced_core::Subscription; /// A stream of runtime events. /// @@ -24,6 +24,11 @@ pub type Subscription = iced_core::Subscription; /// [`Subscription`]: type.Subscription.html pub type EventStream = BoxStream<'static, Event>; +/// A native [`Subscription`] tracker. +/// +/// [`Subscription`]: type.Subscription.html +pub type Tracker = iced_core::subscription::Tracker; + pub use iced_core::subscription::Recipe; mod events; diff --git a/native/src/subscription/events.rs b/native/src/subscription/events.rs index b7301828..6ff2c0fb 100644 --- a/native/src/subscription/events.rs +++ b/native/src/subscription/events.rs @@ -5,7 +5,7 @@ use crate::{ pub struct Events; -impl Recipe for Events { +impl Recipe for Events { type Output = Event; fn hash(&self, state: &mut Hasher) { -- cgit