From b5b17ed4d800c03beb3ad535d1069a7784e8dc1d Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 10:17:08 +0100 Subject: Create `iced_futures` and wire everything up --- futures/src/command.rs | 100 +++++++++++++++++++ futures/src/lib.rs | 8 ++ futures/src/runtime.rs | 79 +++++++++++++++ futures/src/runtime/executor.rs | 26 +++++ futures/src/subscription.rs | 188 ++++++++++++++++++++++++++++++++++++ futures/src/subscription/tracker.rs | 112 +++++++++++++++++++++ 6 files changed, 513 insertions(+) create mode 100644 futures/src/command.rs create mode 100644 futures/src/lib.rs create mode 100644 futures/src/runtime.rs create mode 100644 futures/src/runtime/executor.rs create mode 100644 futures/src/subscription.rs create mode 100644 futures/src/subscription/tracker.rs (limited to 'futures/src') diff --git a/futures/src/command.rs b/futures/src/command.rs new file mode 100644 index 00000000..e7885fb8 --- /dev/null +++ b/futures/src/command.rs @@ -0,0 +1,100 @@ +use futures::future::{BoxFuture, Future, FutureExt}; + +/// A collection of async operations. +/// +/// You should be able to turn a future easily into a [`Command`], either by +/// using the `From` trait or [`Command::perform`]. +/// +/// [`Command`]: struct.Command.html +pub struct Command { + futures: Vec>, +} + +impl Command { + /// Creates an empty [`Command`]. + /// + /// In other words, a [`Command`] that does nothing. + /// + /// [`Command`]: struct.Command.html + pub fn none() -> Self { + Self { + futures: Vec::new(), + } + } + + /// Creates a [`Command`] that performs the action of the given future. + /// + /// [`Command`]: struct.Command.html + pub fn perform( + future: impl Future + 'static + Send, + f: impl Fn(T) -> A + 'static + Send, + ) -> Command { + Command { + futures: vec![future.map(f).boxed()], + } + } + + /// Applies a transformation to the result of a [`Command`]. + /// + /// [`Command`]: struct.Command.html + pub fn map( + mut self, + f: impl Fn(T) -> A + 'static + Send + Sync, + ) -> Command + where + T: 'static, + { + let f = std::sync::Arc::new(f); + + Command { + futures: self + .futures + .drain(..) + .map(|future| { + let f = f.clone(); + + future.map(move |result| f(result)).boxed() + }) + .collect(), + } + } + + /// Creates a [`Command`] that performs the actions of all the given + /// commands. + /// + /// Once this command is run, all the commands will be exectued at once. + /// + /// [`Command`]: struct.Command.html + pub fn batch(commands: impl IntoIterator>) -> Self { + Self { + futures: commands + .into_iter() + .flat_map(|command| command.futures) + .collect(), + } + } + + /// Converts a [`Command`] into its underlying list of futures. + /// + /// [`Command`]: struct.Command.html + pub fn futures(self) -> Vec> { + self.futures + } +} + +impl From for Command +where + A: Future + 'static + Send, +{ + fn from(future: A) -> Self { + Self { + futures: vec![future.boxed()], + } + } +} + +impl std::fmt::Debug for Command { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Command").finish() + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs new file mode 100644 index 00000000..f6bcf85a --- /dev/null +++ b/futures/src/lib.rs @@ -0,0 +1,8 @@ +mod command; + +pub mod runtime; +pub mod subscription; + +pub use command::Command; +pub use runtime::Runtime; +pub use subscription::Subscription; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs new file mode 100644 index 00000000..bc1ad8ac --- /dev/null +++ b/futures/src/runtime.rs @@ -0,0 +1,79 @@ +//! Run commands and subscriptions. +mod executor; + +pub use executor::Executor; + +use crate::{subscription, Command, Subscription}; + +use futures::Sink; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Runtime { + executor: Executor, + receiver: Receiver, + subscriptions: subscription::Tracker, + _message: PhantomData, +} + +impl + Runtime +where + Hasher: std::hash::Hasher + Default, + Event: Send + Clone + 'static, + Executor: self::Executor, + Receiver: Sink + + Unpin + + Send + + Clone + + 'static, + Message: Send + 'static, +{ + pub fn new(executor: Executor, receiver: Receiver) -> Self { + Self { + executor, + receiver, + subscriptions: subscription::Tracker::new(), + _message: PhantomData, + } + } + + pub fn enter(&self, f: impl FnOnce() -> R) -> R { + self.executor.enter(f) + } + + pub fn spawn(&mut self, command: Command) { + use futures::{FutureExt, SinkExt}; + + let futures = command.futures(); + + for future in futures { + let mut receiver = self.receiver.clone(); + + self.executor.spawn(future.then(|message| { + async move { + let _ = receiver.send(message).await; + + () + } + })); + } + } + + pub fn track( + &mut self, + subscription: Subscription, + ) { + let futures = self + .subscriptions + .update(subscription, self.receiver.clone()); + + for future in futures { + self.executor.spawn(future); + } + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions.broadcast(event); + } +} diff --git a/futures/src/runtime/executor.rs b/futures/src/runtime/executor.rs new file mode 100644 index 00000000..855aa105 --- /dev/null +++ b/futures/src/runtime/executor.rs @@ -0,0 +1,26 @@ +use futures::Future; + +pub trait Executor { + fn spawn(&self, future: impl Future + Send + 'static); + + fn enter(&self, f: impl FnOnce() -> R) -> R { + f() + } +} + +impl Executor for futures::executor::ThreadPool { + fn spawn(&self, future: impl Future + Send + 'static) { + self.spawn_ok(future); + } +} + +#[cfg(feature = "tokio")] +impl Executor for tokio::runtime::Runtime { + fn spawn(&self, future: impl Future + Send + 'static) { + let _ = tokio::runtime::Runtime::spawn(self, future); + } + + fn enter(&self, f: impl FnOnce() -> R) -> R { + tokio::runtime::Runtime::enter(self, f) + } +} diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs new file mode 100644 index 00000000..87e51e48 --- /dev/null +++ b/futures/src/subscription.rs @@ -0,0 +1,188 @@ +//! Listen to external events in your application. +mod tracker; + +pub use tracker::Tracker; + +use futures::stream::BoxStream; + +/// A request to listen to external events. +/// +/// Besides performing async actions on demand with [`Command`], most +/// applications also need to listen to external events passively. +/// +/// A [`Subscription`] is normally provided to some runtime, like a [`Command`], +/// and it will generate events as long as the user keeps requesting it. +/// +/// For instance, you can use a [`Subscription`] to listen to a WebSocket +/// connection, keyboard presses, mouse events, time ticks, etc. +/// +/// This type is normally aliased by runtimes with a specific `Input` and/or +/// `Hasher`. +/// +/// [`Command`]: ../struct.Command.html +/// [`Subscription`]: struct.Subscription.html +pub struct Subscription { + recipes: Vec>>, +} + +impl Subscription +where + H: std::hash::Hasher, +{ + /// Returns an empty [`Subscription`] that will not produce any output. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn none() -> Self { + Self { + recipes: Vec::new(), + } + } + + /// Creates a [`Subscription`] from a [`Recipe`] describing it. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + pub fn from_recipe( + recipe: impl Recipe + 'static, + ) -> Self { + Self { + recipes: vec![Box::new(recipe)], + } + } + + /// Batches all the provided subscriptions and returns the resulting + /// [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn batch( + subscriptions: impl IntoIterator>, + ) -> Self { + Self { + recipes: subscriptions + .into_iter() + .flat_map(|subscription| subscription.recipes) + .collect(), + } + } + + /// Returns the different recipes of the [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn recipes(self) -> Vec>> { + self.recipes + } + + /// Transforms the [`Subscription`] output with the given function. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn map( + mut self, + f: impl Fn(O) -> A + Send + Sync + 'static, + ) -> Subscription + where + H: 'static, + I: 'static, + O: 'static, + A: 'static, + { + let function = std::sync::Arc::new(f); + + Subscription { + recipes: self + .recipes + .drain(..) + .map(|recipe| { + Box::new(Map::new(recipe, function.clone())) + as Box> + }) + .collect(), + } + } +} + +impl std::fmt::Debug for Subscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Subscription").finish() + } +} + +/// The description of a [`Subscription`]. +/// +/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used +/// by runtimes to run and identify subscriptions. You can use it to create your +/// own! +/// +/// [`Subscription`]: struct.Subscription.html +/// [`Recipe`]: trait.Recipe.html +pub trait Recipe { + /// The events that will be produced by a [`Subscription`] with this + /// [`Recipe`]. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + type Output; + + /// Hashes the [`Recipe`]. + /// + /// This is used by runtimes to uniquely identify a [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + fn hash(&self, state: &mut Hasher); + + /// Executes the [`Recipe`] and produces the stream of events of its + /// [`Subscription`]. + /// + /// It receives some generic `Input`, which is normally defined by runtimes. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + fn stream( + self: Box, + input: BoxStream<'static, Input>, + ) -> BoxStream<'static, Self::Output>; +} + +struct Map { + recipe: Box>, + mapper: std::sync::Arc B + Send + Sync>, +} + +impl Map { + fn new( + recipe: Box>, + mapper: std::sync::Arc B + Send + Sync + 'static>, + ) -> Self { + Map { recipe, mapper } + } +} + +impl Recipe for Map +where + A: 'static, + B: 'static, + H: std::hash::Hasher, +{ + type Output = B; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::().hash(state); + self.recipe.hash(state); + } + + fn stream( + self: Box, + input: BoxStream<'static, I>, + ) -> futures::stream::BoxStream<'static, Self::Output> { + use futures::StreamExt; + + let mapper = self.mapper; + + self.recipe + .stream(input) + .map(move |element| mapper(element)) + .boxed() + } +} diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs new file mode 100644 index 00000000..a942b619 --- /dev/null +++ b/futures/src/subscription/tracker.rs @@ -0,0 +1,112 @@ +use crate::Subscription; + +use futures::{future::BoxFuture, sink::Sink}; +use std::collections::HashMap; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Tracker { + subscriptions: HashMap>, + _hasher: PhantomData, +} + +#[derive(Debug)] +pub struct Execution { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option>, +} + +impl Tracker +where + Hasher: std::hash::Hasher + Default, + Event: 'static + Send + Clone, +{ + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + _hasher: PhantomData, + } + } + + pub fn update( + &mut self, + subscription: Subscription, + receiver: Receiver, + ) -> Vec> + where + Message: 'static + Send, + Receiver: 'static + + Sink + + Unpin + + Send + + Clone, + { + use futures::{future::FutureExt, stream::StreamExt}; + + let mut futures = Vec::new(); + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if self.subscriptions.contains_key(&id) { + continue; + } + + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + + let future = futures::future::select( + cancelled, + stream.map(Ok).forward(receiver.clone()), + ) + .map(|_| ()); + + let _ = self.subscriptions.insert( + id, + Execution { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + + futures.push(future.boxed()); + } + + self.subscriptions.retain(|id, _| alive.contains(&id)); + + futures + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event.clone()) { + log::error!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} -- cgit From b8b0d97525aaa2641d8493aa65e3108d70c1560a Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 11:08:32 +0100 Subject: Rename `Receiver` to `Sender` in `Runtime` --- futures/src/runtime.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) (limited to 'futures/src') diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index bc1ad8ac..37905c61 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -9,30 +9,30 @@ use futures::Sink; use std::marker::PhantomData; #[derive(Debug)] -pub struct Runtime { +pub struct Runtime { executor: Executor, - receiver: Receiver, + sender: Sender, subscriptions: subscription::Tracker, _message: PhantomData, } -impl - Runtime +impl + Runtime where Hasher: std::hash::Hasher + Default, Event: Send + Clone + 'static, Executor: self::Executor, - Receiver: Sink + Sender: Sink + Unpin + Send + Clone + 'static, Message: Send + 'static, { - pub fn new(executor: Executor, receiver: Receiver) -> Self { + pub fn new(executor: Executor, sender: Sender) -> Self { Self { executor, - receiver, + sender, subscriptions: subscription::Tracker::new(), _message: PhantomData, } @@ -48,11 +48,11 @@ where let futures = command.futures(); for future in futures { - let mut receiver = self.receiver.clone(); + let mut sender = self.sender.clone(); self.executor.spawn(future.then(|message| { async move { - let _ = receiver.send(message).await; + let _ = sender.send(message).await; () } @@ -64,9 +64,8 @@ where &mut self, subscription: Subscription, ) { - let futures = self - .subscriptions - .update(subscription, self.receiver.clone()); + let futures = + self.subscriptions.update(subscription, self.sender.clone()); for future in futures { self.executor.spawn(future); -- cgit From 35760ac68f06e783e64e9048aff0fff6df1c09cf Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 19 Jan 2020 11:08:47 +0100 Subject: Make `thread-pool` optional in `iced_futures` --- futures/src/runtime/executor.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'futures/src') diff --git a/futures/src/runtime/executor.rs b/futures/src/runtime/executor.rs index 855aa105..eec5e231 100644 --- a/futures/src/runtime/executor.rs +++ b/futures/src/runtime/executor.rs @@ -8,6 +8,7 @@ pub trait Executor { } } +#[cfg(feature = "thread-pool")] impl Executor for futures::executor::ThreadPool { fn spawn(&self, future: impl Future + Send + 'static) { self.spawn_ok(future); -- cgit From 90690702e1e4abab804ec91e8ff4183824bec436 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 20 Jan 2020 04:47:36 +0100 Subject: Add `Application::Executor` associated type --- futures/src/executor.rs | 36 ++++++++++++++++++++++++++++++++++++ futures/src/executor/async_std.rs | 15 +++++++++++++++ futures/src/executor/null.rs | 13 +++++++++++++ futures/src/executor/thread_pool.rs | 15 +++++++++++++++ futures/src/executor/tokio.rs | 19 +++++++++++++++++++ futures/src/lib.rs | 6 +++++- futures/src/runtime.rs | 8 ++------ futures/src/runtime/executor.rs | 27 --------------------------- 8 files changed, 105 insertions(+), 34 deletions(-) create mode 100644 futures/src/executor.rs create mode 100644 futures/src/executor/async_std.rs create mode 100644 futures/src/executor/null.rs create mode 100644 futures/src/executor/thread_pool.rs create mode 100644 futures/src/executor/tokio.rs delete mode 100644 futures/src/runtime/executor.rs (limited to 'futures/src') diff --git a/futures/src/executor.rs b/futures/src/executor.rs new file mode 100644 index 00000000..144a41f8 --- /dev/null +++ b/futures/src/executor.rs @@ -0,0 +1,36 @@ +//! Choose your preferred executor to power a runtime. +mod null; + +#[cfg(feature = "thread-pool")] +mod thread_pool; + +#[cfg(feature = "thread-pool")] +pub use thread_pool::ThreadPool; + +#[cfg(feature = "tokio")] +mod tokio; + +#[cfg(feature = "async-std")] +mod async_std; + +pub use null::Null; + +#[cfg(feature = "tokio")] +pub use self::tokio::Tokio; + +#[cfg(feature = "async-std")] +pub use self::async_std::AsyncStd; + +use futures::Future; + +pub trait Executor: Sized { + fn new() -> Result + where + Self: Sized; + + fn spawn(&self, future: impl Future + Send + 'static); + + fn enter(&self, f: impl FnOnce() -> R) -> R { + f() + } +} diff --git a/futures/src/executor/async_std.rs b/futures/src/executor/async_std.rs new file mode 100644 index 00000000..b056b23d --- /dev/null +++ b/futures/src/executor/async_std.rs @@ -0,0 +1,15 @@ +use crate::Executor; + +use futures::Future; + +pub struct AsyncStd; + +impl Executor for AsyncStd { + fn new() -> Result { + Ok(Self) + } + + fn spawn(&self, future: impl Future + Send + 'static) { + let _ = async_std::task::spawn(future); + } +} diff --git a/futures/src/executor/null.rs b/futures/src/executor/null.rs new file mode 100644 index 00000000..722073bb --- /dev/null +++ b/futures/src/executor/null.rs @@ -0,0 +1,13 @@ +use crate::Executor; + +use futures::Future; + +pub struct Null; + +impl Executor for Null { + fn new() -> Result { + Ok(Self) + } + + fn spawn(&self, _future: impl Future + Send + 'static) {} +} diff --git a/futures/src/executor/thread_pool.rs b/futures/src/executor/thread_pool.rs new file mode 100644 index 00000000..6393d0d5 --- /dev/null +++ b/futures/src/executor/thread_pool.rs @@ -0,0 +1,15 @@ +use crate::Executor; + +use futures::Future; + +pub type ThreadPool = futures::executor::ThreadPool; + +impl Executor for futures::executor::ThreadPool { + fn new() -> Result { + futures::executor::ThreadPool::new() + } + + fn spawn(&self, future: impl Future + Send + 'static) { + self.spawn_ok(future); + } +} diff --git a/futures/src/executor/tokio.rs b/futures/src/executor/tokio.rs new file mode 100644 index 00000000..aafa7e7b --- /dev/null +++ b/futures/src/executor/tokio.rs @@ -0,0 +1,19 @@ +use crate::Executor; + +use futures::Future; + +pub type Tokio = tokio::runtime::Runtime; + +impl Executor for Tokio { + fn new() -> Result { + tokio::runtime::Runtime::new() + } + + fn spawn(&self, future: impl Future + Send + 'static) { + let _ = tokio::runtime::Runtime::spawn(self, future); + } + + fn enter(&self, f: impl FnOnce() -> R) -> R { + tokio::runtime::Runtime::enter(self, f) + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index f6bcf85a..832a50f6 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -1,8 +1,12 @@ +pub use futures; + mod command; +mod runtime; -pub mod runtime; +pub mod executor; pub mod subscription; pub use command::Command; +pub use executor::Executor; pub use runtime::Runtime; pub use subscription::Subscription; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index 37905c61..a508c46e 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -1,9 +1,5 @@ -//! Run commands and subscriptions. -mod executor; - -pub use executor::Executor; - -use crate::{subscription, Command, Subscription}; +//! Run commands and keep track of subscriptions. +use crate::{subscription, Command, Executor, Subscription}; use futures::Sink; use std::marker::PhantomData; diff --git a/futures/src/runtime/executor.rs b/futures/src/runtime/executor.rs deleted file mode 100644 index eec5e231..00000000 --- a/futures/src/runtime/executor.rs +++ /dev/null @@ -1,27 +0,0 @@ -use futures::Future; - -pub trait Executor { - fn spawn(&self, future: impl Future + Send + 'static); - - fn enter(&self, f: impl FnOnce() -> R) -> R { - f() - } -} - -#[cfg(feature = "thread-pool")] -impl Executor for futures::executor::ThreadPool { - fn spawn(&self, future: impl Future + Send + 'static) { - self.spawn_ok(future); - } -} - -#[cfg(feature = "tokio")] -impl Executor for tokio::runtime::Runtime { - fn spawn(&self, future: impl Future + Send + 'static) { - let _ = tokio::runtime::Runtime::spawn(self, future); - } - - fn enter(&self, f: impl FnOnce() -> R) -> R { - tokio::runtime::Runtime::enter(self, f) - } -} -- cgit From 04086a90c9e933ebfb42de378054e1115b33529d Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 20 Jan 2020 05:43:09 +0100 Subject: Implement `WasmBindgen` executor and reorganize --- futures/src/executor.rs | 12 +++++++++--- futures/src/executor/wasm_bindgen.rs | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) create mode 100644 futures/src/executor/wasm_bindgen.rs (limited to 'futures/src') diff --git a/futures/src/executor.rs b/futures/src/executor.rs index 144a41f8..b2ff043e 100644 --- a/futures/src/executor.rs +++ b/futures/src/executor.rs @@ -4,23 +4,29 @@ mod null; #[cfg(feature = "thread-pool")] mod thread_pool; -#[cfg(feature = "thread-pool")] -pub use thread_pool::ThreadPool; - #[cfg(feature = "tokio")] mod tokio; #[cfg(feature = "async-std")] mod async_std; +#[cfg(target_arch = "wasm32")] +mod wasm_bindgen; + pub use null::Null; +#[cfg(feature = "thread-pool")] +pub use thread_pool::ThreadPool; + #[cfg(feature = "tokio")] pub use self::tokio::Tokio; #[cfg(feature = "async-std")] pub use self::async_std::AsyncStd; +#[cfg(target_arch = "wasm32")] +pub use wasm_bindgen::WasmBindgen; + use futures::Future; pub trait Executor: Sized { diff --git a/futures/src/executor/wasm_bindgen.rs b/futures/src/executor/wasm_bindgen.rs new file mode 100644 index 00000000..70a8ea8e --- /dev/null +++ b/futures/src/executor/wasm_bindgen.rs @@ -0,0 +1,17 @@ +use crate::Executor; + +#[derive(Debug)] +pub struct WasmBindgen; + +impl Executor for WasmBindgen { + fn new() -> Result { + Ok(Self) + } + + fn spawn( + &self, + future: impl futures::Future + Send + 'static, + ) { + wasm_bindgen_futures::spawn_local(future); + } +} -- cgit From f14009601e270e43bdf29b8f4842cf136fbbd8b9 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 20 Jan 2020 09:49:17 +0100 Subject: Write documentation for `iced_futures` --- futures/src/executor.rs | 13 +++++++++++ futures/src/executor/async_std.rs | 2 ++ futures/src/executor/null.rs | 2 ++ futures/src/executor/thread_pool.rs | 1 + futures/src/executor/tokio.rs | 1 + futures/src/lib.rs | 6 +++++ futures/src/runtime.rs | 45 +++++++++++++++++++++++++++++++++++++ futures/src/subscription.rs | 39 ++++++++++++++++---------------- futures/src/subscription/tracker.rs | 36 +++++++++++++++++++++++++++++ 9 files changed, 126 insertions(+), 19 deletions(-) (limited to 'futures/src') diff --git a/futures/src/executor.rs b/futures/src/executor.rs index b2ff043e..c2b9cc72 100644 --- a/futures/src/executor.rs +++ b/futures/src/executor.rs @@ -29,13 +29,26 @@ pub use wasm_bindgen::WasmBindgen; use futures::Future; +/// A type that can run futures. pub trait Executor: Sized { + /// Creates a new [`Executor`]. + /// + /// [`Executor`]: trait.Executor.html fn new() -> Result where Self: Sized; + /// Spawns a future in the [`Executor`]. + /// + /// [`Executor`]: trait.Executor.html fn spawn(&self, future: impl Future + Send + 'static); + /// Runs the given closure inside the [`Executor`]. + /// + /// Some executors, like `tokio`, require some global state to be in place + /// before creating futures. This method can be leveraged to set up this + /// global state, call a function, restore the state, and obtain the result + /// of the call. fn enter(&self, f: impl FnOnce() -> R) -> R { f() } diff --git a/futures/src/executor/async_std.rs b/futures/src/executor/async_std.rs index b056b23d..641dfbd2 100644 --- a/futures/src/executor/async_std.rs +++ b/futures/src/executor/async_std.rs @@ -2,6 +2,8 @@ use crate::Executor; use futures::Future; +/// A type representing the `async-std` runtime. +#[derive(Debug)] pub struct AsyncStd; impl Executor for AsyncStd { diff --git a/futures/src/executor/null.rs b/futures/src/executor/null.rs index 722073bb..6d5cf982 100644 --- a/futures/src/executor/null.rs +++ b/futures/src/executor/null.rs @@ -2,6 +2,8 @@ use crate::Executor; use futures::Future; +/// An executor that drops all the futures, instead of spawning them. +#[derive(Debug)] pub struct Null; impl Executor for Null { diff --git a/futures/src/executor/thread_pool.rs b/futures/src/executor/thread_pool.rs index 6393d0d5..09cb4d21 100644 --- a/futures/src/executor/thread_pool.rs +++ b/futures/src/executor/thread_pool.rs @@ -2,6 +2,7 @@ use crate::Executor; use futures::Future; +/// A thread pool for futures. pub type ThreadPool = futures::executor::ThreadPool; impl Executor for futures::executor::ThreadPool { diff --git a/futures/src/executor/tokio.rs b/futures/src/executor/tokio.rs index aafa7e7b..4c609686 100644 --- a/futures/src/executor/tokio.rs +++ b/futures/src/executor/tokio.rs @@ -2,6 +2,7 @@ use crate::Executor; use futures::Future; +/// The `tokio` runtime. pub type Tokio = tokio::runtime::Runtime; impl Executor for Tokio { diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 832a50f6..4872df10 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -1,3 +1,9 @@ +//! Asynchronous tasks for GUI programming, inspired by Elm. +#![deny(missing_docs)] +#![deny(missing_debug_implementations)] +#![deny(unused_results)] +#![deny(unsafe_code)] +#![deny(rust_2018_idioms)] pub use futures; mod command; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index a508c46e..9fd9899a 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -4,6 +4,15 @@ use crate::{subscription, Command, Executor, Subscription}; use futures::Sink; use std::marker::PhantomData; +/// A batteries-included runtime of commands and subscriptions. +/// +/// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any +/// [`Command`] or [`Subscription`] and get notified of the results! +/// +/// [`Runtime`]: struct.Runtime.html +/// [`Executor`]: executor/trait.Executor.html +/// [`Command`]: struct.Command.html +/// [`Subscription`]: subscription/struct.Subscription.html #[derive(Debug)] pub struct Runtime { executor: Executor, @@ -25,6 +34,13 @@ where + 'static, Message: Send + 'static, { + /// Creates a new empty [`Runtime`]. + /// + /// You need to provide: + /// - an [`Executor`] to spawn futures + /// - a `Sender` implementing `Sink` to receive the results + /// + /// [`Runtime`]: struct.Runtime.html pub fn new(executor: Executor, sender: Sender) -> Self { Self { executor, @@ -34,10 +50,24 @@ where } } + /// Runs the given closure inside the [`Executor`] of the [`Runtime`]. + /// + /// See [`Executor::enter`] to learn more. + /// + /// [`Executor`]: executor/trait.Executor.html + /// [`Runtime`]: struct.Runtime.html + /// [`Executor::enter`]: executor/trait.Executor.html#method.enter pub fn enter(&self, f: impl FnOnce() -> R) -> R { self.executor.enter(f) } + /// Spawns a [`Command`] in the [`Runtime`]. + /// + /// The resulting `Message` will be forwarded to the `Sender` of the + /// [`Runtime`]. + /// + /// [`Command`]: struct.Command.html + /// [`Runtime`]: struct.Runtime.html pub fn spawn(&mut self, command: Command) { use futures::{FutureExt, SinkExt}; @@ -56,6 +86,14 @@ where } } + /// Tracks a [`Subscription`] in the [`Runtime`]. + /// + /// It will spawn new streams or close old ones as necessary! See + /// [`Tracker::update`] to learn more about this! + /// + /// [`Subscription`]: subscription/struct.Subscription.html + /// [`Runtime`]: struct.Runtime.html + /// [`Tracker::update`]: subscription/struct.Tracker.html#method.update pub fn track( &mut self, subscription: Subscription, @@ -68,6 +106,13 @@ where } } + /// Broadcasts an event to all the subscriptions currently alive in the + /// [`Runtime`]. + /// + /// See [`Tracker::broadcast`] to learn more. + /// + /// [`Runtime`]: struct.Runtime.html + /// [`Tracker::broadcast`]: subscription/struct.Tracker.html#method.broadcast pub fn broadcast(&mut self, event: Event) { self.subscriptions.broadcast(event); } diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index 87e51e48..b68444cd 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -16,16 +16,16 @@ use futures::stream::BoxStream; /// For instance, you can use a [`Subscription`] to listen to a WebSocket /// connection, keyboard presses, mouse events, time ticks, etc. /// -/// This type is normally aliased by runtimes with a specific `Input` and/or +/// This type is normally aliased by runtimes with a specific `Event` and/or /// `Hasher`. /// /// [`Command`]: ../struct.Command.html /// [`Subscription`]: struct.Subscription.html -pub struct Subscription { - recipes: Vec>>, +pub struct Subscription { + recipes: Vec>>, } -impl Subscription +impl Subscription where H: std::hash::Hasher, { @@ -43,7 +43,7 @@ where /// [`Subscription`]: struct.Subscription.html /// [`Recipe`]: trait.Recipe.html pub fn from_recipe( - recipe: impl Recipe + 'static, + recipe: impl Recipe + 'static, ) -> Self { Self { recipes: vec![Box::new(recipe)], @@ -55,7 +55,7 @@ where /// /// [`Subscription`]: struct.Subscription.html pub fn batch( - subscriptions: impl IntoIterator>, + subscriptions: impl IntoIterator>, ) -> Self { Self { recipes: subscriptions @@ -68,7 +68,7 @@ where /// Returns the different recipes of the [`Subscription`]. /// /// [`Subscription`]: struct.Subscription.html - pub fn recipes(self) -> Vec>> { + pub fn recipes(self) -> Vec>> { self.recipes } @@ -78,10 +78,10 @@ where pub fn map( mut self, f: impl Fn(O) -> A + Send + Sync + 'static, - ) -> Subscription + ) -> Subscription where H: 'static, - I: 'static, + E: 'static, O: 'static, A: 'static, { @@ -93,7 +93,7 @@ where .drain(..) .map(|recipe| { Box::new(Map::new(recipe, function.clone())) - as Box> + as Box> }) .collect(), } @@ -114,7 +114,7 @@ impl std::fmt::Debug for Subscription { /// /// [`Subscription`]: struct.Subscription.html /// [`Recipe`]: trait.Recipe.html -pub trait Recipe { +pub trait Recipe { /// The events that will be produced by a [`Subscription`] with this /// [`Recipe`]. /// @@ -133,31 +133,32 @@ pub trait Recipe { /// Executes the [`Recipe`] and produces the stream of events of its /// [`Subscription`]. /// - /// It receives some generic `Input`, which is normally defined by runtimes. + /// It receives some stream of generic events, which is normally defined by + /// shells. /// /// [`Subscription`]: struct.Subscription.html /// [`Recipe`]: trait.Recipe.html fn stream( self: Box, - input: BoxStream<'static, Input>, + input: BoxStream<'static, Event>, ) -> BoxStream<'static, Self::Output>; } -struct Map { - recipe: Box>, +struct Map { + recipe: Box>, mapper: std::sync::Arc B + Send + Sync>, } -impl Map { +impl Map { fn new( - recipe: Box>, + recipe: Box>, mapper: std::sync::Arc B + Send + Sync + 'static>, ) -> Self { Map { recipe, mapper } } } -impl Recipe for Map +impl Recipe for Map where A: 'static, B: 'static, @@ -174,7 +175,7 @@ where fn stream( self: Box, - input: BoxStream<'static, I>, + input: BoxStream<'static, E>, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::StreamExt; diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index a942b619..c8a1ee18 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -4,6 +4,11 @@ use futures::{future::BoxFuture, sink::Sink}; use std::collections::HashMap; use std::marker::PhantomData; +/// A registry of subscription streams. +/// +/// If you have an application that continuously returns a [`Subscription`], +/// you can use a [`Tracker`] to keep track of the different recipes and keep +/// its executions alive. #[derive(Debug)] pub struct Tracker { subscriptions: HashMap>, @@ -21,6 +26,9 @@ where Hasher: std::hash::Hasher + Default, Event: 'static + Send + Clone, { + /// Creates a new empty [`Tracker`]. + /// + /// [`Tracker`]: struct.Tracker.html pub fn new() -> Self { Self { subscriptions: HashMap::new(), @@ -28,6 +36,26 @@ where } } + /// Updates the [`Tracker`] with the given [`Subscription`]. + /// + /// A [`Subscription`] can cause new streams to be spawned or old streams + /// to be closed. + /// + /// The [`Tracker`] keeps track of these streams between calls to this + /// method: + /// + /// - If the provided [`Subscription`] contains a new [`Recipe`] that is + /// currently not being run, it will spawn a new stream and keep it alive. + /// - On the other hand, if a [`Recipe`] is currently in execution and the + /// provided [`Subscription`] does not contain it anymore, then the + /// [`Tracker`] will close and drop the relevant stream. + /// + /// It returns a list of futures that need to be spawned to materialize + /// the [`Tracker`] changes. + /// + /// [`Tracker`]: struct.Tracker.html + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html pub fn update( &mut self, subscription: Subscription, @@ -96,6 +124,14 @@ where futures } + /// Broadcasts an event to the subscriptions currently alive. + /// + /// A subscription's [`Recipe::stream`] always receives a stream of events + /// as input. This stream can be used by some subscription to listen to + /// shell events. + /// + /// This method publishes the given event to all the subscription streams + /// currently open. pub fn broadcast(&mut self, event: Event) { self.subscriptions .values_mut() -- cgit From 7bb6411dfc6797c567a96ff940fc72f3a6747ff4 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 20 Jan 2020 10:39:17 +0100 Subject: Write documentation for `executor::WasmBindgen` --- futures/src/executor/wasm_bindgen.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'futures/src') diff --git a/futures/src/executor/wasm_bindgen.rs b/futures/src/executor/wasm_bindgen.rs index 70a8ea8e..2a12a1c0 100644 --- a/futures/src/executor/wasm_bindgen.rs +++ b/futures/src/executor/wasm_bindgen.rs @@ -1,5 +1,6 @@ use crate::Executor; +/// A type representing a `wasm-bindgen-futures` runtime. #[derive(Debug)] pub struct WasmBindgen; -- cgit From 91d9d65a03ce9b211e4043726e7424949d314325 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 20 Jan 2020 10:49:25 +0100 Subject: Improve consistency in executor documentation --- futures/src/executor/async_std.rs | 2 +- futures/src/executor/thread_pool.rs | 2 +- futures/src/executor/tokio.rs | 2 +- futures/src/executor/wasm_bindgen.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) (limited to 'futures/src') diff --git a/futures/src/executor/async_std.rs b/futures/src/executor/async_std.rs index 641dfbd2..27949e31 100644 --- a/futures/src/executor/async_std.rs +++ b/futures/src/executor/async_std.rs @@ -2,7 +2,7 @@ use crate::Executor; use futures::Future; -/// A type representing the `async-std` runtime. +/// An `async-std` runtime. #[derive(Debug)] pub struct AsyncStd; diff --git a/futures/src/executor/thread_pool.rs b/futures/src/executor/thread_pool.rs index 09cb4d21..1ec5bf69 100644 --- a/futures/src/executor/thread_pool.rs +++ b/futures/src/executor/thread_pool.rs @@ -2,7 +2,7 @@ use crate::Executor; use futures::Future; -/// A thread pool for futures. +/// A thread pool runtime for futures. pub type ThreadPool = futures::executor::ThreadPool; impl Executor for futures::executor::ThreadPool { diff --git a/futures/src/executor/tokio.rs b/futures/src/executor/tokio.rs index 4c609686..20802ceb 100644 --- a/futures/src/executor/tokio.rs +++ b/futures/src/executor/tokio.rs @@ -2,7 +2,7 @@ use crate::Executor; use futures::Future; -/// The `tokio` runtime. +/// A `tokio` runtime. pub type Tokio = tokio::runtime::Runtime; impl Executor for Tokio { diff --git a/futures/src/executor/wasm_bindgen.rs b/futures/src/executor/wasm_bindgen.rs index 2a12a1c0..69b7c7e2 100644 --- a/futures/src/executor/wasm_bindgen.rs +++ b/futures/src/executor/wasm_bindgen.rs @@ -1,6 +1,6 @@ use crate::Executor; -/// A type representing a `wasm-bindgen-futures` runtime. +/// A `wasm-bindgen-futures` runtime. #[derive(Debug)] pub struct WasmBindgen; -- cgit