diff options
author | 2020-01-21 00:15:01 +0100 | |
---|---|---|
committer | 2020-01-21 00:15:01 +0100 | |
commit | 7016221556ea8183ebcd8ef8df00044e2eda71e7 (patch) | |
tree | bc1609b71b88437fc7497af339b6427f63121c76 /futures/src | |
parent | 6ca5e6184f9f1c12b427bdafcce0b4e9fbc5bb14 (diff) | |
parent | 91d9d65a03ce9b211e4043726e7424949d314325 (diff) | |
download | iced-7016221556ea8183ebcd8ef8df00044e2eda71e7.tar.gz iced-7016221556ea8183ebcd8ef8df00044e2eda71e7.tar.bz2 iced-7016221556ea8183ebcd8ef8df00044e2eda71e7.zip |
Merge pull request #164 from hecrj/feature/custom-runtime
Custom futures executor with `iced_futures`
Diffstat (limited to '')
-rw-r--r-- | futures/src/command.rs (renamed from core/src/command.rs) | 0 | ||||
-rw-r--r-- | futures/src/executor.rs | 55 | ||||
-rw-r--r-- | futures/src/executor/async_std.rs | 17 | ||||
-rw-r--r-- | futures/src/executor/null.rs | 15 | ||||
-rw-r--r-- | futures/src/executor/thread_pool.rs | 16 | ||||
-rw-r--r-- | futures/src/executor/tokio.rs | 20 | ||||
-rw-r--r-- | futures/src/executor/wasm_bindgen.rs | 18 | ||||
-rw-r--r-- | futures/src/lib.rs | 18 | ||||
-rw-r--r-- | futures/src/runtime.rs | 119 | ||||
-rw-r--r-- | futures/src/subscription.rs (renamed from core/src/subscription.rs) | 46 | ||||
-rw-r--r-- | futures/src/subscription/tracker.rs | 148 |
11 files changed, 452 insertions, 20 deletions
diff --git a/core/src/command.rs b/futures/src/command.rs index e7885fb8..e7885fb8 100644 --- a/core/src/command.rs +++ b/futures/src/command.rs diff --git a/futures/src/executor.rs b/futures/src/executor.rs new file mode 100644 index 00000000..c2b9cc72 --- /dev/null +++ b/futures/src/executor.rs @@ -0,0 +1,55 @@ +//! Choose your preferred executor to power a runtime. +mod null; + +#[cfg(feature = "thread-pool")] +mod thread_pool; + +#[cfg(feature = "tokio")] +mod tokio; + +#[cfg(feature = "async-std")] +mod async_std; + +#[cfg(target_arch = "wasm32")] +mod wasm_bindgen; + +pub use null::Null; + +#[cfg(feature = "thread-pool")] +pub use thread_pool::ThreadPool; + +#[cfg(feature = "tokio")] +pub use self::tokio::Tokio; + +#[cfg(feature = "async-std")] +pub use self::async_std::AsyncStd; + +#[cfg(target_arch = "wasm32")] +pub use wasm_bindgen::WasmBindgen; + +use futures::Future; + +/// A type that can run futures. +pub trait Executor: Sized { + /// Creates a new [`Executor`]. + /// + /// [`Executor`]: trait.Executor.html + fn new() -> Result<Self, futures::io::Error> + where + Self: Sized; + + /// Spawns a future in the [`Executor`]. + /// + /// [`Executor`]: trait.Executor.html + fn spawn(&self, future: impl Future<Output = ()> + Send + 'static); + + /// Runs the given closure inside the [`Executor`]. + /// + /// Some executors, like `tokio`, require some global state to be in place + /// before creating futures. This method can be leveraged to set up this + /// global state, call a function, restore the state, and obtain the result + /// of the call. + fn enter<R>(&self, f: impl FnOnce() -> R) -> R { + f() + } +} diff --git a/futures/src/executor/async_std.rs b/futures/src/executor/async_std.rs new file mode 100644 index 00000000..27949e31 --- /dev/null +++ b/futures/src/executor/async_std.rs @@ -0,0 +1,17 @@ +use crate::Executor; + +use futures::Future; + +/// An `async-std` runtime. +#[derive(Debug)] +pub struct AsyncStd; + +impl Executor for AsyncStd { + fn new() -> Result<Self, futures::io::Error> { + Ok(Self) + } + + fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { + let _ = async_std::task::spawn(future); + } +} diff --git a/futures/src/executor/null.rs b/futures/src/executor/null.rs new file mode 100644 index 00000000..6d5cf982 --- /dev/null +++ b/futures/src/executor/null.rs @@ -0,0 +1,15 @@ +use crate::Executor; + +use futures::Future; + +/// An executor that drops all the futures, instead of spawning them. +#[derive(Debug)] +pub struct Null; + +impl Executor for Null { + fn new() -> Result<Self, futures::io::Error> { + Ok(Self) + } + + fn spawn(&self, _future: impl Future<Output = ()> + Send + 'static) {} +} diff --git a/futures/src/executor/thread_pool.rs b/futures/src/executor/thread_pool.rs new file mode 100644 index 00000000..1ec5bf69 --- /dev/null +++ b/futures/src/executor/thread_pool.rs @@ -0,0 +1,16 @@ +use crate::Executor; + +use futures::Future; + +/// A thread pool runtime for futures. +pub type ThreadPool = futures::executor::ThreadPool; + +impl Executor for futures::executor::ThreadPool { + fn new() -> Result<Self, futures::io::Error> { + futures::executor::ThreadPool::new() + } + + fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { + self.spawn_ok(future); + } +} diff --git a/futures/src/executor/tokio.rs b/futures/src/executor/tokio.rs new file mode 100644 index 00000000..20802ceb --- /dev/null +++ b/futures/src/executor/tokio.rs @@ -0,0 +1,20 @@ +use crate::Executor; + +use futures::Future; + +/// A `tokio` runtime. +pub type Tokio = tokio::runtime::Runtime; + +impl Executor for Tokio { + fn new() -> Result<Self, futures::io::Error> { + tokio::runtime::Runtime::new() + } + + fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { + let _ = tokio::runtime::Runtime::spawn(self, future); + } + + fn enter<R>(&self, f: impl FnOnce() -> R) -> R { + tokio::runtime::Runtime::enter(self, f) + } +} diff --git a/futures/src/executor/wasm_bindgen.rs b/futures/src/executor/wasm_bindgen.rs new file mode 100644 index 00000000..69b7c7e2 --- /dev/null +++ b/futures/src/executor/wasm_bindgen.rs @@ -0,0 +1,18 @@ +use crate::Executor; + +/// A `wasm-bindgen-futures` runtime. +#[derive(Debug)] +pub struct WasmBindgen; + +impl Executor for WasmBindgen { + fn new() -> Result<Self, futures::io::Error> { + Ok(Self) + } + + fn spawn( + &self, + future: impl futures::Future<Output = ()> + Send + 'static, + ) { + wasm_bindgen_futures::spawn_local(future); + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs new file mode 100644 index 00000000..4872df10 --- /dev/null +++ b/futures/src/lib.rs @@ -0,0 +1,18 @@ +//! Asynchronous tasks for GUI programming, inspired by Elm. +#![deny(missing_docs)] +#![deny(missing_debug_implementations)] +#![deny(unused_results)] +#![deny(unsafe_code)] +#![deny(rust_2018_idioms)] +pub use futures; + +mod command; +mod runtime; + +pub mod executor; +pub mod subscription; + +pub use command::Command; +pub use executor::Executor; +pub use runtime::Runtime; +pub use subscription::Subscription; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs new file mode 100644 index 00000000..9fd9899a --- /dev/null +++ b/futures/src/runtime.rs @@ -0,0 +1,119 @@ +//! Run commands and keep track of subscriptions. +use crate::{subscription, Command, Executor, Subscription}; + +use futures::Sink; +use std::marker::PhantomData; + +/// A batteries-included runtime of commands and subscriptions. +/// +/// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any +/// [`Command`] or [`Subscription`] and get notified of the results! +/// +/// [`Runtime`]: struct.Runtime.html +/// [`Executor`]: executor/trait.Executor.html +/// [`Command`]: struct.Command.html +/// [`Subscription`]: subscription/struct.Subscription.html +#[derive(Debug)] +pub struct Runtime<Hasher, Event, Executor, Sender, Message> { + executor: Executor, + sender: Sender, + subscriptions: subscription::Tracker<Hasher, Event>, + _message: PhantomData<Message>, +} + +impl<Hasher, Event, Executor, Sender, Message> + Runtime<Hasher, Event, Executor, Sender, Message> +where + Hasher: std::hash::Hasher + Default, + Event: Send + Clone + 'static, + Executor: self::Executor, + Sender: Sink<Message, Error = core::convert::Infallible> + + Unpin + + Send + + Clone + + 'static, + Message: Send + 'static, +{ + /// Creates a new empty [`Runtime`]. + /// + /// You need to provide: + /// - an [`Executor`] to spawn futures + /// - a `Sender` implementing `Sink` to receive the results + /// + /// [`Runtime`]: struct.Runtime.html + pub fn new(executor: Executor, sender: Sender) -> Self { + Self { + executor, + sender, + subscriptions: subscription::Tracker::new(), + _message: PhantomData, + } + } + + /// Runs the given closure inside the [`Executor`] of the [`Runtime`]. + /// + /// See [`Executor::enter`] to learn more. + /// + /// [`Executor`]: executor/trait.Executor.html + /// [`Runtime`]: struct.Runtime.html + /// [`Executor::enter`]: executor/trait.Executor.html#method.enter + pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R { + self.executor.enter(f) + } + + /// Spawns a [`Command`] in the [`Runtime`]. + /// + /// The resulting `Message` will be forwarded to the `Sender` of the + /// [`Runtime`]. + /// + /// [`Command`]: struct.Command.html + /// [`Runtime`]: struct.Runtime.html + pub fn spawn(&mut self, command: Command<Message>) { + use futures::{FutureExt, SinkExt}; + + let futures = command.futures(); + + for future in futures { + let mut sender = self.sender.clone(); + + self.executor.spawn(future.then(|message| { + async move { + let _ = sender.send(message).await; + + () + } + })); + } + } + + /// Tracks a [`Subscription`] in the [`Runtime`]. + /// + /// It will spawn new streams or close old ones as necessary! See + /// [`Tracker::update`] to learn more about this! + /// + /// [`Subscription`]: subscription/struct.Subscription.html + /// [`Runtime`]: struct.Runtime.html + /// [`Tracker::update`]: subscription/struct.Tracker.html#method.update + pub fn track( + &mut self, + subscription: Subscription<Hasher, Event, Message>, + ) { + let futures = + self.subscriptions.update(subscription, self.sender.clone()); + + for future in futures { + self.executor.spawn(future); + } + } + + /// Broadcasts an event to all the subscriptions currently alive in the + /// [`Runtime`]. + /// + /// See [`Tracker::broadcast`] to learn more. + /// + /// [`Runtime`]: struct.Runtime.html + /// [`Tracker::broadcast`]: subscription/struct.Tracker.html#method.broadcast + pub fn broadcast(&mut self, event: Event) { + self.subscriptions.broadcast(event); + } +} diff --git a/core/src/subscription.rs b/futures/src/subscription.rs index d9e7e388..b68444cd 100644 --- a/core/src/subscription.rs +++ b/futures/src/subscription.rs @@ -1,4 +1,9 @@ //! Listen to external events in your application. +mod tracker; + +pub use tracker::Tracker; + +use futures::stream::BoxStream; /// A request to listen to external events. /// @@ -11,16 +16,16 @@ /// For instance, you can use a [`Subscription`] to listen to a WebSocket /// connection, keyboard presses, mouse events, time ticks, etc. /// -/// This type is normally aliased by runtimes with a specific `Input` and/or +/// This type is normally aliased by runtimes with a specific `Event` and/or /// `Hasher`. /// /// [`Command`]: ../struct.Command.html /// [`Subscription`]: struct.Subscription.html -pub struct Subscription<Hasher, Input, Output> { - recipes: Vec<Box<dyn Recipe<Hasher, Input, Output = Output>>>, +pub struct Subscription<Hasher, Event, Output> { + recipes: Vec<Box<dyn Recipe<Hasher, Event, Output = Output>>>, } -impl<H, I, O> Subscription<H, I, O> +impl<H, E, O> Subscription<H, E, O> where H: std::hash::Hasher, { @@ -38,7 +43,7 @@ where /// [`Subscription`]: struct.Subscription.html /// [`Recipe`]: trait.Recipe.html pub fn from_recipe( - recipe: impl Recipe<H, I, Output = O> + 'static, + recipe: impl Recipe<H, E, Output = O> + 'static, ) -> Self { Self { recipes: vec![Box::new(recipe)], @@ -50,7 +55,7 @@ where /// /// [`Subscription`]: struct.Subscription.html pub fn batch( - subscriptions: impl IntoIterator<Item = Subscription<H, I, O>>, + subscriptions: impl IntoIterator<Item = Subscription<H, E, O>>, ) -> Self { Self { recipes: subscriptions @@ -63,7 +68,7 @@ where /// Returns the different recipes of the [`Subscription`]. /// /// [`Subscription`]: struct.Subscription.html - pub fn recipes(self) -> Vec<Box<dyn Recipe<H, I, Output = O>>> { + pub fn recipes(self) -> Vec<Box<dyn Recipe<H, E, Output = O>>> { self.recipes } @@ -73,10 +78,10 @@ where pub fn map<A>( mut self, f: impl Fn(O) -> A + Send + Sync + 'static, - ) -> Subscription<H, I, A> + ) -> Subscription<H, E, A> where H: 'static, - I: 'static, + E: 'static, O: 'static, A: 'static, { @@ -88,7 +93,7 @@ where .drain(..) .map(|recipe| { Box::new(Map::new(recipe, function.clone())) - as Box<dyn Recipe<H, I, Output = A>> + as Box<dyn Recipe<H, E, Output = A>> }) .collect(), } @@ -109,7 +114,7 @@ impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> { /// /// [`Subscription`]: struct.Subscription.html /// [`Recipe`]: trait.Recipe.html -pub trait Recipe<Hasher: std::hash::Hasher, Input> { +pub trait Recipe<Hasher: std::hash::Hasher, Event> { /// The events that will be produced by a [`Subscription`] with this /// [`Recipe`]. /// @@ -128,31 +133,32 @@ pub trait Recipe<Hasher: std::hash::Hasher, Input> { /// Executes the [`Recipe`] and produces the stream of events of its /// [`Subscription`]. /// - /// It receives some generic `Input`, which is normally defined by runtimes. + /// It receives some stream of generic events, which is normally defined by + /// shells. /// /// [`Subscription`]: struct.Subscription.html /// [`Recipe`]: trait.Recipe.html fn stream( self: Box<Self>, - input: Input, - ) -> futures::stream::BoxStream<'static, Self::Output>; + input: BoxStream<'static, Event>, + ) -> BoxStream<'static, Self::Output>; } -struct Map<Hasher, Input, A, B> { - recipe: Box<dyn Recipe<Hasher, Input, Output = A>>, +struct Map<Hasher, Event, A, B> { + recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>, } -impl<H, I, A, B> Map<H, I, A, B> { +impl<H, E, A, B> Map<H, E, A, B> { fn new( - recipe: Box<dyn Recipe<H, I, Output = A>>, + recipe: Box<dyn Recipe<H, E, Output = A>>, mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>, ) -> Self { Map { recipe, mapper } } } -impl<H, I, A, B> Recipe<H, I> for Map<H, I, A, B> +impl<H, E, A, B> Recipe<H, E> for Map<H, E, A, B> where A: 'static, B: 'static, @@ -169,7 +175,7 @@ where fn stream( self: Box<Self>, - input: I, + input: BoxStream<'static, E>, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::StreamExt; diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs new file mode 100644 index 00000000..c8a1ee18 --- /dev/null +++ b/futures/src/subscription/tracker.rs @@ -0,0 +1,148 @@ +use crate::Subscription; + +use futures::{future::BoxFuture, sink::Sink}; +use std::collections::HashMap; +use std::marker::PhantomData; + +/// A registry of subscription streams. +/// +/// If you have an application that continuously returns a [`Subscription`], +/// you can use a [`Tracker`] to keep track of the different recipes and keep +/// its executions alive. +#[derive(Debug)] +pub struct Tracker<Hasher, Event> { + subscriptions: HashMap<u64, Execution<Event>>, + _hasher: PhantomData<Hasher>, +} + +#[derive(Debug)] +pub struct Execution<Event> { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option<futures::channel::mpsc::Sender<Event>>, +} + +impl<Hasher, Event> Tracker<Hasher, Event> +where + Hasher: std::hash::Hasher + Default, + Event: 'static + Send + Clone, +{ + /// Creates a new empty [`Tracker`]. + /// + /// [`Tracker`]: struct.Tracker.html + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + _hasher: PhantomData, + } + } + + /// Updates the [`Tracker`] with the given [`Subscription`]. + /// + /// A [`Subscription`] can cause new streams to be spawned or old streams + /// to be closed. + /// + /// The [`Tracker`] keeps track of these streams between calls to this + /// method: + /// + /// - If the provided [`Subscription`] contains a new [`Recipe`] that is + /// currently not being run, it will spawn a new stream and keep it alive. + /// - On the other hand, if a [`Recipe`] is currently in execution and the + /// provided [`Subscription`] does not contain it anymore, then the + /// [`Tracker`] will close and drop the relevant stream. + /// + /// It returns a list of futures that need to be spawned to materialize + /// the [`Tracker`] changes. + /// + /// [`Tracker`]: struct.Tracker.html + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + pub fn update<Message, Receiver>( + &mut self, + subscription: Subscription<Hasher, Event, Message>, + receiver: Receiver, + ) -> Vec<BoxFuture<'static, ()>> + where + Message: 'static + Send, + Receiver: 'static + + Sink<Message, Error = core::convert::Infallible> + + Unpin + + Send + + Clone, + { + use futures::{future::FutureExt, stream::StreamExt}; + + let mut futures = Vec::new(); + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if self.subscriptions.contains_key(&id) { + continue; + } + + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + + let future = futures::future::select( + cancelled, + stream.map(Ok).forward(receiver.clone()), + ) + .map(|_| ()); + + let _ = self.subscriptions.insert( + id, + Execution { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + + futures.push(future.boxed()); + } + + self.subscriptions.retain(|id, _| alive.contains(&id)); + + futures + } + + /// Broadcasts an event to the subscriptions currently alive. + /// + /// A subscription's [`Recipe::stream`] always receives a stream of events + /// as input. This stream can be used by some subscription to listen to + /// shell events. + /// + /// This method publishes the given event to all the subscription streams + /// currently open. + pub fn broadcast(&mut self, event: Event) { + self.subscriptions + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event.clone()) { + log::error!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} |