diff options
author | 2020-01-19 10:17:08 +0100 | |
---|---|---|
committer | 2020-01-19 10:17:44 +0100 | |
commit | b5b17ed4d800c03beb3ad535d1069a7784e8dc1d (patch) | |
tree | b9e6477bd11bd6784f8ee61e818b5f5ff1a44318 /futures | |
parent | d50ff9b5d97d9c3d6c6c70a9b4efe764b6126c86 (diff) | |
download | iced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.tar.gz iced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.tar.bz2 iced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.zip |
Create `iced_futures` and wire everything up
Diffstat (limited to 'futures')
-rw-r--r-- | futures/Cargo.toml | 23 | ||||
-rw-r--r-- | futures/src/command.rs | 100 | ||||
-rw-r--r-- | futures/src/lib.rs | 8 | ||||
-rw-r--r-- | futures/src/runtime.rs | 79 | ||||
-rw-r--r-- | futures/src/runtime/executor.rs | 26 | ||||
-rw-r--r-- | futures/src/subscription.rs | 188 | ||||
-rw-r--r-- | futures/src/subscription/tracker.rs | 112 |
7 files changed, 536 insertions, 0 deletions
diff --git a/futures/Cargo.toml b/futures/Cargo.toml new file mode 100644 index 00000000..fe0d378c --- /dev/null +++ b/futures/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "iced_futures" +version = "0.1.0-alpha" +authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"] +edition = "2018" +description = "Commands, subscriptions, and runtimes for Iced" +license = "MIT" +repository = "https://github.com/hecrj/iced" +documentation = "https://docs.rs/iced_futures" +keywords = ["gui", "ui", "graphics", "interface", "futures"] +categories = ["gui"] + +[dependencies] +log = "0.4" + +[dependencies.futures] +version = "0.3" +features = ["thread-pool"] + +[dependencies.tokio] +version = "0.2" +optional = true +features = ["rt-core"] diff --git a/futures/src/command.rs b/futures/src/command.rs new file mode 100644 index 00000000..e7885fb8 --- /dev/null +++ b/futures/src/command.rs @@ -0,0 +1,100 @@ +use futures::future::{BoxFuture, Future, FutureExt}; + +/// A collection of async operations. +/// +/// You should be able to turn a future easily into a [`Command`], either by +/// using the `From` trait or [`Command::perform`]. +/// +/// [`Command`]: struct.Command.html +pub struct Command<T> { + futures: Vec<BoxFuture<'static, T>>, +} + +impl<T> Command<T> { + /// Creates an empty [`Command`]. + /// + /// In other words, a [`Command`] that does nothing. + /// + /// [`Command`]: struct.Command.html + pub fn none() -> Self { + Self { + futures: Vec::new(), + } + } + + /// Creates a [`Command`] that performs the action of the given future. + /// + /// [`Command`]: struct.Command.html + pub fn perform<A>( + future: impl Future<Output = T> + 'static + Send, + f: impl Fn(T) -> A + 'static + Send, + ) -> Command<A> { + Command { + futures: vec![future.map(f).boxed()], + } + } + + /// Applies a transformation to the result of a [`Command`]. + /// + /// [`Command`]: struct.Command.html + pub fn map<A>( + mut self, + f: impl Fn(T) -> A + 'static + Send + Sync, + ) -> Command<A> + where + T: 'static, + { + let f = std::sync::Arc::new(f); + + Command { + futures: self + .futures + .drain(..) + .map(|future| { + let f = f.clone(); + + future.map(move |result| f(result)).boxed() + }) + .collect(), + } + } + + /// Creates a [`Command`] that performs the actions of all the given + /// commands. + /// + /// Once this command is run, all the commands will be exectued at once. + /// + /// [`Command`]: struct.Command.html + pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self { + Self { + futures: commands + .into_iter() + .flat_map(|command| command.futures) + .collect(), + } + } + + /// Converts a [`Command`] into its underlying list of futures. + /// + /// [`Command`]: struct.Command.html + pub fn futures(self) -> Vec<BoxFuture<'static, T>> { + self.futures + } +} + +impl<T, A> From<A> for Command<T> +where + A: Future<Output = T> + 'static + Send, +{ + fn from(future: A) -> Self { + Self { + futures: vec![future.boxed()], + } + } +} + +impl<T> std::fmt::Debug for Command<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Command").finish() + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs new file mode 100644 index 00000000..f6bcf85a --- /dev/null +++ b/futures/src/lib.rs @@ -0,0 +1,8 @@ +mod command; + +pub mod runtime; +pub mod subscription; + +pub use command::Command; +pub use runtime::Runtime; +pub use subscription::Subscription; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs new file mode 100644 index 00000000..bc1ad8ac --- /dev/null +++ b/futures/src/runtime.rs @@ -0,0 +1,79 @@ +//! Run commands and subscriptions. +mod executor; + +pub use executor::Executor; + +use crate::{subscription, Command, Subscription}; + +use futures::Sink; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Runtime<Hasher, Event, Executor, Receiver, Message> { + executor: Executor, + receiver: Receiver, + subscriptions: subscription::Tracker<Hasher, Event>, + _message: PhantomData<Message>, +} + +impl<Hasher, Event, Executor, Receiver, Message> + Runtime<Hasher, Event, Executor, Receiver, Message> +where + Hasher: std::hash::Hasher + Default, + Event: Send + Clone + 'static, + Executor: self::Executor, + Receiver: Sink<Message, Error = core::convert::Infallible> + + Unpin + + Send + + Clone + + 'static, + Message: Send + 'static, +{ + pub fn new(executor: Executor, receiver: Receiver) -> Self { + Self { + executor, + receiver, + subscriptions: subscription::Tracker::new(), + _message: PhantomData, + } + } + + pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R { + self.executor.enter(f) + } + + pub fn spawn(&mut self, command: Command<Message>) { + use futures::{FutureExt, SinkExt}; + + let futures = command.futures(); + + for future in futures { + let mut receiver = self.receiver.clone(); + + self.executor.spawn(future.then(|message| { + async move { + let _ = receiver.send(message).await; + + () + } + })); + } + } + + pub fn track( + &mut self, + subscription: Subscription<Hasher, Event, Message>, + ) { + let futures = self + .subscriptions + .update(subscription, self.receiver.clone()); + + for future in futures { + self.executor.spawn(future); + } + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions.broadcast(event); + } +} diff --git a/futures/src/runtime/executor.rs b/futures/src/runtime/executor.rs new file mode 100644 index 00000000..855aa105 --- /dev/null +++ b/futures/src/runtime/executor.rs @@ -0,0 +1,26 @@ +use futures::Future; + +pub trait Executor { + fn spawn(&self, future: impl Future<Output = ()> + Send + 'static); + + fn enter<R>(&self, f: impl FnOnce() -> R) -> R { + f() + } +} + +impl Executor for futures::executor::ThreadPool { + fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { + self.spawn_ok(future); + } +} + +#[cfg(feature = "tokio")] +impl Executor for tokio::runtime::Runtime { + fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { + let _ = tokio::runtime::Runtime::spawn(self, future); + } + + fn enter<R>(&self, f: impl FnOnce() -> R) -> R { + tokio::runtime::Runtime::enter(self, f) + } +} diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs new file mode 100644 index 00000000..87e51e48 --- /dev/null +++ b/futures/src/subscription.rs @@ -0,0 +1,188 @@ +//! Listen to external events in your application. +mod tracker; + +pub use tracker::Tracker; + +use futures::stream::BoxStream; + +/// A request to listen to external events. +/// +/// Besides performing async actions on demand with [`Command`], most +/// applications also need to listen to external events passively. +/// +/// A [`Subscription`] is normally provided to some runtime, like a [`Command`], +/// and it will generate events as long as the user keeps requesting it. +/// +/// For instance, you can use a [`Subscription`] to listen to a WebSocket +/// connection, keyboard presses, mouse events, time ticks, etc. +/// +/// This type is normally aliased by runtimes with a specific `Input` and/or +/// `Hasher`. +/// +/// [`Command`]: ../struct.Command.html +/// [`Subscription`]: struct.Subscription.html +pub struct Subscription<Hasher, Input, Output> { + recipes: Vec<Box<dyn Recipe<Hasher, Input, Output = Output>>>, +} + +impl<H, I, O> Subscription<H, I, O> +where + H: std::hash::Hasher, +{ + /// Returns an empty [`Subscription`] that will not produce any output. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn none() -> Self { + Self { + recipes: Vec::new(), + } + } + + /// Creates a [`Subscription`] from a [`Recipe`] describing it. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + pub fn from_recipe( + recipe: impl Recipe<H, I, Output = O> + 'static, + ) -> Self { + Self { + recipes: vec![Box::new(recipe)], + } + } + + /// Batches all the provided subscriptions and returns the resulting + /// [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn batch( + subscriptions: impl IntoIterator<Item = Subscription<H, I, O>>, + ) -> Self { + Self { + recipes: subscriptions + .into_iter() + .flat_map(|subscription| subscription.recipes) + .collect(), + } + } + + /// Returns the different recipes of the [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn recipes(self) -> Vec<Box<dyn Recipe<H, I, Output = O>>> { + self.recipes + } + + /// Transforms the [`Subscription`] output with the given function. + /// + /// [`Subscription`]: struct.Subscription.html + pub fn map<A>( + mut self, + f: impl Fn(O) -> A + Send + Sync + 'static, + ) -> Subscription<H, I, A> + where + H: 'static, + I: 'static, + O: 'static, + A: 'static, + { + let function = std::sync::Arc::new(f); + + Subscription { + recipes: self + .recipes + .drain(..) + .map(|recipe| { + Box::new(Map::new(recipe, function.clone())) + as Box<dyn Recipe<H, I, Output = A>> + }) + .collect(), + } + } +} + +impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Subscription").finish() + } +} + +/// The description of a [`Subscription`]. +/// +/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used +/// by runtimes to run and identify subscriptions. You can use it to create your +/// own! +/// +/// [`Subscription`]: struct.Subscription.html +/// [`Recipe`]: trait.Recipe.html +pub trait Recipe<Hasher: std::hash::Hasher, Input> { + /// The events that will be produced by a [`Subscription`] with this + /// [`Recipe`]. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + type Output; + + /// Hashes the [`Recipe`]. + /// + /// This is used by runtimes to uniquely identify a [`Subscription`]. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + fn hash(&self, state: &mut Hasher); + + /// Executes the [`Recipe`] and produces the stream of events of its + /// [`Subscription`]. + /// + /// It receives some generic `Input`, which is normally defined by runtimes. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Recipe`]: trait.Recipe.html + fn stream( + self: Box<Self>, + input: BoxStream<'static, Input>, + ) -> BoxStream<'static, Self::Output>; +} + +struct Map<Hasher, Input, A, B> { + recipe: Box<dyn Recipe<Hasher, Input, Output = A>>, + mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>, +} + +impl<H, I, A, B> Map<H, I, A, B> { + fn new( + recipe: Box<dyn Recipe<H, I, Output = A>>, + mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>, + ) -> Self { + Map { recipe, mapper } + } +} + +impl<H, I, A, B> Recipe<H, I> for Map<H, I, A, B> +where + A: 'static, + B: 'static, + H: std::hash::Hasher, +{ + type Output = B; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::<B>().hash(state); + self.recipe.hash(state); + } + + fn stream( + self: Box<Self>, + input: BoxStream<'static, I>, + ) -> futures::stream::BoxStream<'static, Self::Output> { + use futures::StreamExt; + + let mapper = self.mapper; + + self.recipe + .stream(input) + .map(move |element| mapper(element)) + .boxed() + } +} diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs new file mode 100644 index 00000000..a942b619 --- /dev/null +++ b/futures/src/subscription/tracker.rs @@ -0,0 +1,112 @@ +use crate::Subscription; + +use futures::{future::BoxFuture, sink::Sink}; +use std::collections::HashMap; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Tracker<Hasher, Event> { + subscriptions: HashMap<u64, Execution<Event>>, + _hasher: PhantomData<Hasher>, +} + +#[derive(Debug)] +pub struct Execution<Event> { + _cancel: futures::channel::oneshot::Sender<()>, + listener: Option<futures::channel::mpsc::Sender<Event>>, +} + +impl<Hasher, Event> Tracker<Hasher, Event> +where + Hasher: std::hash::Hasher + Default, + Event: 'static + Send + Clone, +{ + pub fn new() -> Self { + Self { + subscriptions: HashMap::new(), + _hasher: PhantomData, + } + } + + pub fn update<Message, Receiver>( + &mut self, + subscription: Subscription<Hasher, Event, Message>, + receiver: Receiver, + ) -> Vec<BoxFuture<'static, ()>> + where + Message: 'static + Send, + Receiver: 'static + + Sink<Message, Error = core::convert::Infallible> + + Unpin + + Send + + Clone, + { + use futures::{future::FutureExt, stream::StreamExt}; + + let mut futures = Vec::new(); + + let recipes = subscription.recipes(); + let mut alive = std::collections::HashSet::new(); + + for recipe in recipes { + let id = { + let mut hasher = Hasher::default(); + recipe.hash(&mut hasher); + + hasher.finish() + }; + + let _ = alive.insert(id); + + if self.subscriptions.contains_key(&id) { + continue; + } + + let (cancel, cancelled) = futures::channel::oneshot::channel(); + + // TODO: Use bus if/when it supports async + let (event_sender, event_receiver) = + futures::channel::mpsc::channel(100); + + let stream = recipe.stream(event_receiver.boxed()); + + let future = futures::future::select( + cancelled, + stream.map(Ok).forward(receiver.clone()), + ) + .map(|_| ()); + + let _ = self.subscriptions.insert( + id, + Execution { + _cancel: cancel, + listener: if event_sender.is_closed() { + None + } else { + Some(event_sender) + }, + }, + ); + + futures.push(future.boxed()); + } + + self.subscriptions.retain(|id, _| alive.contains(&id)); + + futures + } + + pub fn broadcast(&mut self, event: Event) { + self.subscriptions + .values_mut() + .filter_map(|connection| connection.listener.as_mut()) + .for_each(|listener| { + if let Err(error) = listener.try_send(event.clone()) { + log::error!( + "Error sending event to subscription: {:?}", + error + ); + } + }); + } +} |