From 32f7ca261f0655938ae7c8919599b020ddea8ff8 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 08:36:44 +0100 Subject: Implement `subscription::Tracker` in `iced_core` --- core/src/lib.rs | 2 +- core/src/subscription.rs | 11 ++-- core/src/subscription/tracker.rs | 112 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 4 deletions(-) create mode 100644 core/src/subscription/tracker.rs (limited to 'core/src') diff --git a/core/src/lib.rs b/core/src/lib.rs index 821b09c1..6f13c310 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,7 +9,7 @@ //! [Iced]: https://github.com/hecrj/iced //! [`iced_native`]: https://github.com/hecrj/iced/tree/master/native //! [`iced_web`]: https://github.com/hecrj/iced/tree/master/web -#![deny(missing_docs)] +//#![deny(missing_docs)] #![deny(missing_debug_implementations)] #![deny(unused_results)] #![deny(unsafe_code)] diff --git a/core/src/subscription.rs b/core/src/subscription.rs index d9e7e388..87e51e48 100644 --- a/core/src/subscription.rs +++ b/core/src/subscription.rs @@ -1,4 +1,9 @@ //! Listen to external events in your application. +mod tracker; + +pub use tracker::Tracker; + +use futures::stream::BoxStream; /// A request to listen to external events. /// @@ -134,8 +139,8 @@ pub trait Recipe { /// [`Recipe`]: trait.Recipe.html fn stream( self: Box, - input: Input, - ) -> futures::stream::BoxStream<'static, Self::Output>; + input: BoxStream<'static, Input>, + ) -> BoxStream<'static, Self::Output>; } struct Map { @@ -169,7 +174,7 @@ where fn stream( self: Box, - input: I, + input: BoxStream<'static, I>, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::StreamExt; diff --git a/core/src/subscription/tracker.rs b/core/src/subscription/tracker.rs new file mode 100644 index 00000000..826f60c0 --- /dev/null +++ b/core/src/subscription/tracker.rs @@ -0,0 +1,112 @@ +use crate::Subscription; + +use futures::{future::BoxFuture, sink::Sink}; +use std::collections::HashMap; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Tracker { + subscriptions: HashMap>, + _hasher: PhantomData, +} + +#[derive(Debug)] +pub struct Execution { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option>, +} + +impl Tracker +where + Hasher: std::hash::Hasher + Default, + Event: 'static + Send + Clone, +{ + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + _hasher: PhantomData, + } + } + + pub fn update( + &mut self, + subscription: Subscription, + sink: S, + ) -> Vec> + where + Message: 'static + Send, + S: 'static + + Sink + + Unpin + + Send + + Clone, + { + use futures::{future::FutureExt, stream::StreamExt}; + + let mut futures = Vec::new(); + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if self.subscriptions.contains_key(&id) { + continue; + } + + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + + let future = futures::future::select( + cancelled, + stream.map(Ok).forward(sink.clone()), + ) + .map(|_| ()); + + let _ = self.subscriptions.insert( + id, + Execution { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + + futures.push(future.boxed()); + } + + self.subscriptions.retain(|id, _| alive.contains(&id)); + + futures + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event.clone()) { + log::error!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} -- cgit From d50ff9b5d97d9c3d6c6c70a9b4efe764b6126c86 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 09:06:48 +0100 Subject: Implement `Runtime` and `Executor` in `iced_core` They can be leveraged by shells to easily execute commands and track subscriptions. --- core/src/lib.rs | 6 ++++ core/src/runtime.rs | 74 ++++++++++++++++++++++++++++++++++++++++ core/src/runtime/executor.rs | 11 ++++++ core/src/subscription/tracker.rs | 8 ++--- 4 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 core/src/runtime.rs create mode 100644 core/src/runtime/executor.rs (limited to 'core/src') diff --git a/core/src/lib.rs b/core/src/lib.rs index 6f13c310..760acefe 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -44,3 +44,9 @@ pub mod subscription; #[cfg(feature = "subscription")] pub use subscription::Subscription; + +#[cfg(feature = "runtime")] +mod runtime; + +#[cfg(feature = "runtime")] +pub use runtime::Runtime; diff --git a/core/src/runtime.rs b/core/src/runtime.rs new file mode 100644 index 00000000..31234d11 --- /dev/null +++ b/core/src/runtime.rs @@ -0,0 +1,74 @@ +mod executor; + +pub use executor::Executor; + +use crate::{subscription, Command, Subscription}; + +use futures::Sink; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Runtime { + executor: Executor, + subscriptions: subscription::Tracker, + receiver: Receiver, + _message: PhantomData, +} + +impl + Runtime +where + Hasher: std::hash::Hasher + Default, + Event: Send + Clone + 'static, + Executor: self::Executor, + Receiver: Sink + + Unpin + + Send + + Clone + + 'static, + Message: Send + 'static, +{ + pub fn new(receiver: Receiver) -> Self { + Self { + executor: Executor::new(), + subscriptions: subscription::Tracker::new(), + receiver, + _message: PhantomData, + } + } + + pub fn spawn(&mut self, command: Command) { + use futures::{FutureExt, SinkExt}; + + let futures = command.futures(); + + for future in futures { + let mut receiver = self.receiver.clone(); + + self.executor.spawn(future.then(|message| { + async move { + let _ = receiver.send(message).await; + + () + } + })); + } + } + + pub fn track( + &mut self, + subscription: Subscription, + ) { + let futures = self + .subscriptions + .update(subscription, self.receiver.clone()); + + for future in futures { + self.executor.spawn(future); + } + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions.broadcast(event); + } +} diff --git a/core/src/runtime/executor.rs b/core/src/runtime/executor.rs new file mode 100644 index 00000000..d171c6d5 --- /dev/null +++ b/core/src/runtime/executor.rs @@ -0,0 +1,11 @@ +use futures::Future; + +pub trait Executor { + fn new() -> Self; + + fn spawn(&self, future: impl Future + Send + 'static); + + fn enter(&self, f: impl FnOnce() -> R) -> R { + f() + } +} diff --git a/core/src/subscription/tracker.rs b/core/src/subscription/tracker.rs index 826f60c0..a942b619 100644 --- a/core/src/subscription/tracker.rs +++ b/core/src/subscription/tracker.rs @@ -28,14 +28,14 @@ where } } - pub fn update( + pub fn update( &mut self, subscription: Subscription, - sink: S, + receiver: Receiver, ) -> Vec> where Message: 'static + Send, - S: 'static + Receiver: 'static + Sink + Unpin + Send @@ -72,7 +72,7 @@ where let future = futures::future::select( cancelled, - stream.map(Ok).forward(sink.clone()), + stream.map(Ok).forward(receiver.clone()), ) .map(|_| ()); -- cgit From b5b17ed4d800c03beb3ad535d1069a7784e8dc1d Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 10:17:08 +0100 Subject: Create `iced_futures` and wire everything up --- core/src/command.rs | 100 --------------------- core/src/lib.rs | 18 ---- core/src/runtime.rs | 74 --------------- core/src/runtime/executor.rs | 11 --- core/src/subscription.rs | 188 --------------------------------------- core/src/subscription/tracker.rs | 112 ----------------------- 6 files changed, 503 deletions(-) delete mode 100644 core/src/command.rs delete mode 100644 core/src/runtime.rs delete mode 100644 core/src/runtime/executor.rs delete mode 100644 core/src/subscription.rs delete mode 100644 core/src/subscription/tracker.rs (limited to 'core/src') diff --git a/core/src/command.rs b/core/src/command.rs deleted file mode 100644 index e7885fb8..00000000 --- a/core/src/command.rs +++ /dev/null @@ -1,100 +0,0 @@ -use futures::future::{BoxFuture, Future, FutureExt}; - -/// A collection of async operations. -/// -/// You should be able to turn a future easily into a [`Command`], either by -/// using the `From` trait or [`Command::perform`]. -/// -/// [`Command`]: struct.Command.html -pub struct Command { - futures: Vec>, -} - -impl Command { - /// Creates an empty [`Command`]. - /// - /// In other words, a [`Command`] that does nothing. - /// - /// [`Command`]: struct.Command.html - pub fn none() -> Self { - Self { - futures: Vec::new(), - } - } - - /// Creates a [`Command`] that performs the action of the given future. - /// - /// [`Command`]: struct.Command.html - pub fn perform( - future: impl Future + 'static + Send, - f: impl Fn(T) -> A + 'static + Send, - ) -> Command { - Command { - futures: vec![future.map(f).boxed()], - } - } - - /// Applies a transformation to the result of a [`Command`]. - /// - /// [`Command`]: struct.Command.html - pub fn map( - mut self, - f: impl Fn(T) -> A + 'static + Send + Sync, - ) -> Command - where - T: 'static, - { - let f = std::sync::Arc::new(f); - - Command { - futures: self - .futures - .drain(..) - .map(|future| { - let f = f.clone(); - - future.map(move |result| f(result)).boxed() - }) - .collect(), - } - } - - /// Creates a [`Command`] that performs the actions of all the given - /// commands. - /// - /// Once this command is run, all the commands will be exectued at once. - /// - /// [`Command`]: struct.Command.html - pub fn batch(commands: impl IntoIterator>) -> Self { - Self { - futures: commands - .into_iter() - .flat_map(|command| command.futures) - .collect(), - } - } - - /// Converts a [`Command`] into its underlying list of futures. - /// - /// [`Command`]: struct.Command.html - pub fn futures(self) -> Vec> { - self.futures - } -} - -impl From for Command -where - A: Future + 'static + Send, -{ - fn from(future: A) -> Self { - Self { - futures: vec![future.boxed()], - } - } -} - -impl std::fmt::Debug for Command { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Command").finish() - } -} diff --git a/core/src/lib.rs b/core/src/lib.rs index 760acefe..bec307ad 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -32,21 +32,3 @@ pub use length::Length; pub use point::Point; pub use rectangle::Rectangle; pub use vector::Vector; - -#[cfg(feature = "command")] -mod command; - -#[cfg(feature = "command")] -pub use command::Command; - -#[cfg(feature = "subscription")] -pub mod subscription; - -#[cfg(feature = "subscription")] -pub use subscription::Subscription; - -#[cfg(feature = "runtime")] -mod runtime; - -#[cfg(feature = "runtime")] -pub use runtime::Runtime; diff --git a/core/src/runtime.rs b/core/src/runtime.rs deleted file mode 100644 index 31234d11..00000000 --- a/core/src/runtime.rs +++ /dev/null @@ -1,74 +0,0 @@ -mod executor; - -pub use executor::Executor; - -use crate::{subscription, Command, Subscription}; - -use futures::Sink; -use std::marker::PhantomData; - -#[derive(Debug)] -pub struct Runtime { - executor: Executor, - subscriptions: subscription::Tracker, - receiver: Receiver, - _message: PhantomData, -} - -impl - Runtime -where - Hasher: std::hash::Hasher + Default, - Event: Send + Clone + 'static, - Executor: self::Executor, - Receiver: Sink - + Unpin - + Send - + Clone - + 'static, - Message: Send + 'static, -{ - pub fn new(receiver: Receiver) -> Self { - Self { - executor: Executor::new(), - subscriptions: subscription::Tracker::new(), - receiver, - _message: PhantomData, - } - } - - pub fn spawn(&mut self, command: Command) { - use futures::{FutureExt, SinkExt}; - - let futures = command.futures(); - - for future in futures { - let mut receiver = self.receiver.clone(); - - self.executor.spawn(future.then(|message| { - async move { - let _ = receiver.send(message).await; - - () - } - })); - } - } - - pub fn track( - &mut self, - subscription: Subscription, - ) { - let futures = self - .subscriptions - .update(subscription, self.receiver.clone()); - - for future in futures { - self.executor.spawn(future); - } - } - - pub fn broadcast(&mut self, event: Event) { - self.subscriptions.broadcast(event); - } -} diff --git a/core/src/runtime/executor.rs b/core/src/runtime/executor.rs deleted file mode 100644 index d171c6d5..00000000 --- a/core/src/runtime/executor.rs +++ /dev/null @@ -1,11 +0,0 @@ -use futures::Future; - -pub trait Executor { - fn new() -> Self; - - fn spawn(&self, future: impl Future + Send + 'static); - - fn enter(&self, f: impl FnOnce() -> R) -> R { - f() - } -} diff --git a/core/src/subscription.rs b/core/src/subscription.rs deleted file mode 100644 index 87e51e48..00000000 --- a/core/src/subscription.rs +++ /dev/null @@ -1,188 +0,0 @@ -//! Listen to external events in your application. -mod tracker; - -pub use tracker::Tracker; - -use futures::stream::BoxStream; - -/// A request to listen to external events. -/// -/// Besides performing async actions on demand with [`Command`], most -/// applications also need to listen to external events passively. -/// -/// A [`Subscription`] is normally provided to some runtime, like a [`Command`], -/// and it will generate events as long as the user keeps requesting it. -/// -/// For instance, you can use a [`Subscription`] to listen to a WebSocket -/// connection, keyboard presses, mouse events, time ticks, etc. -/// -/// This type is normally aliased by runtimes with a specific `Input` and/or -/// `Hasher`. -/// -/// [`Command`]: ../struct.Command.html -/// [`Subscription`]: struct.Subscription.html -pub struct Subscription { - recipes: Vec>>, -} - -impl Subscription -where - H: std::hash::Hasher, -{ - /// Returns an empty [`Subscription`] that will not produce any output. - /// - /// [`Subscription`]: struct.Subscription.html - pub fn none() -> Self { - Self { - recipes: Vec::new(), - } - } - - /// Creates a [`Subscription`] from a [`Recipe`] describing it. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html - pub fn from_recipe( - recipe: impl Recipe + 'static, - ) -> Self { - Self { - recipes: vec![Box::new(recipe)], - } - } - - /// Batches all the provided subscriptions and returns the resulting - /// [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html - pub fn batch( - subscriptions: impl IntoIterator>, - ) -> Self { - Self { - recipes: subscriptions - .into_iter() - .flat_map(|subscription| subscription.recipes) - .collect(), - } - } - - /// Returns the different recipes of the [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html - pub fn recipes(self) -> Vec>> { - self.recipes - } - - /// Transforms the [`Subscription`] output with the given function. - /// - /// [`Subscription`]: struct.Subscription.html - pub fn map( - mut self, - f: impl Fn(O) -> A + Send + Sync + 'static, - ) -> Subscription - where - H: 'static, - I: 'static, - O: 'static, - A: 'static, - { - let function = std::sync::Arc::new(f); - - Subscription { - recipes: self - .recipes - .drain(..) - .map(|recipe| { - Box::new(Map::new(recipe, function.clone())) - as Box> - }) - .collect(), - } - } -} - -impl std::fmt::Debug for Subscription { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Subscription").finish() - } -} - -/// The description of a [`Subscription`]. -/// -/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used -/// by runtimes to run and identify subscriptions. You can use it to create your -/// own! -/// -/// [`Subscription`]: struct.Subscription.html -/// [`Recipe`]: trait.Recipe.html -pub trait Recipe { - /// The events that will be produced by a [`Subscription`] with this - /// [`Recipe`]. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html - type Output; - - /// Hashes the [`Recipe`]. - /// - /// This is used by runtimes to uniquely identify a [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html - fn hash(&self, state: &mut Hasher); - - /// Executes the [`Recipe`] and produces the stream of events of its - /// [`Subscription`]. - /// - /// It receives some generic `Input`, which is normally defined by runtimes. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html - fn stream( - self: Box, - input: BoxStream<'static, Input>, - ) -> BoxStream<'static, Self::Output>; -} - -struct Map { - recipe: Box>, - mapper: std::sync::Arc B + Send + Sync>, -} - -impl Map { - fn new( - recipe: Box>, - mapper: std::sync::Arc B + Send + Sync + 'static>, - ) -> Self { - Map { recipe, mapper } - } -} - -impl Recipe for Map -where - A: 'static, - B: 'static, - H: std::hash::Hasher, -{ - type Output = B; - - fn hash(&self, state: &mut H) { - use std::hash::Hash; - - std::any::TypeId::of::().hash(state); - self.recipe.hash(state); - } - - fn stream( - self: Box, - input: BoxStream<'static, I>, - ) -> futures::stream::BoxStream<'static, Self::Output> { - use futures::StreamExt; - - let mapper = self.mapper; - - self.recipe - .stream(input) - .map(move |element| mapper(element)) - .boxed() - } -} diff --git a/core/src/subscription/tracker.rs b/core/src/subscription/tracker.rs deleted file mode 100644 index a942b619..00000000 --- a/core/src/subscription/tracker.rs +++ /dev/null @@ -1,112 +0,0 @@ -use crate::Subscription; - -use futures::{future::BoxFuture, sink::Sink}; -use std::collections::HashMap; -use std::marker::PhantomData; - -#[derive(Debug)] -pub struct Tracker { - subscriptions: HashMap>, - _hasher: PhantomData, -} - -#[derive(Debug)] -pub struct Execution { - _cancel: futures::channel::oneshot::Sender<()>, - listener: Option>, -} - -impl Tracker -where - Hasher: std::hash::Hasher + Default, - Event: 'static + Send + Clone, -{ - pub fn new() -> Self { - Self { - subscriptions: HashMap::new(), - _hasher: PhantomData, - } - } - - pub fn update( - &mut self, - subscription: Subscription, - receiver: Receiver, - ) -> Vec> - where - Message: 'static + Send, - Receiver: 'static - + Sink - + Unpin - + Send - + Clone, - { - use futures::{future::FutureExt, stream::StreamExt}; - - let mut futures = Vec::new(); - - let recipes = subscription.recipes(); - let mut alive = std::collections::HashSet::new(); - - for recipe in recipes { - let id = { - let mut hasher = Hasher::default(); - recipe.hash(&mut hasher); - - hasher.finish() - }; - - let _ = alive.insert(id); - - if self.subscriptions.contains_key(&id) { - continue; - } - - let (cancel, cancelled) = futures::channel::oneshot::channel(); - - // TODO: Use bus if/when it supports async - let (event_sender, event_receiver) = - futures::channel::mpsc::channel(100); - - let stream = recipe.stream(event_receiver.boxed()); - - let future = futures::future::select( - cancelled, - stream.map(Ok).forward(receiver.clone()), - ) - .map(|_| ()); - - let _ = self.subscriptions.insert( - id, - Execution { - _cancel: cancel, - listener: if event_sender.is_closed() { - None - } else { - Some(event_sender) - }, - }, - ); - - futures.push(future.boxed()); - } - - self.subscriptions.retain(|id, _| alive.contains(&id)); - - futures - } - - pub fn broadcast(&mut self, event: Event) { - self.subscriptions - .values_mut() - .filter_map(|connection| connection.listener.as_mut()) - .for_each(|listener| { - if let Err(error) = listener.try_send(event.clone()) { - log::error!( - "Error sending event to subscription: {:?}", - error - ); - } - }); - } -} -- cgit From f14009601e270e43bdf29b8f4842cf136fbbd8b9 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 20 Jan 2020 09:49:17 +0100 Subject: Write documentation for `iced_futures` --- core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core/src') diff --git a/core/src/lib.rs b/core/src/lib.rs index bec307ad..51863327 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,7 +9,7 @@ //! [Iced]: https://github.com/hecrj/iced //! [`iced_native`]: https://github.com/hecrj/iced/tree/master/native //! [`iced_web`]: https://github.com/hecrj/iced/tree/master/web -//#![deny(missing_docs)] +#![deny(missing_docs)] #![deny(missing_debug_implementations)] #![deny(unused_results)] #![deny(unsafe_code)] -- cgit