summaryrefslogtreecommitdiffstats
path: root/futures
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2020-01-19 10:17:08 +0100
committerLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2020-01-19 10:17:44 +0100
commitb5b17ed4d800c03beb3ad535d1069a7784e8dc1d (patch)
treeb9e6477bd11bd6784f8ee61e818b5f5ff1a44318 /futures
parentd50ff9b5d97d9c3d6c6c70a9b4efe764b6126c86 (diff)
downloadiced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.tar.gz
iced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.tar.bz2
iced-b5b17ed4d800c03beb3ad535d1069a7784e8dc1d.zip
Create `iced_futures` and wire everything up
Diffstat (limited to 'futures')
-rw-r--r--futures/Cargo.toml23
-rw-r--r--futures/src/command.rs100
-rw-r--r--futures/src/lib.rs8
-rw-r--r--futures/src/runtime.rs79
-rw-r--r--futures/src/runtime/executor.rs26
-rw-r--r--futures/src/subscription.rs188
-rw-r--r--futures/src/subscription/tracker.rs112
7 files changed, 536 insertions, 0 deletions
diff --git a/futures/Cargo.toml b/futures/Cargo.toml
new file mode 100644
index 00000000..fe0d378c
--- /dev/null
+++ b/futures/Cargo.toml
@@ -0,0 +1,23 @@
+[package]
+name = "iced_futures"
+version = "0.1.0-alpha"
+authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"]
+edition = "2018"
+description = "Commands, subscriptions, and runtimes for Iced"
+license = "MIT"
+repository = "https://github.com/hecrj/iced"
+documentation = "https://docs.rs/iced_futures"
+keywords = ["gui", "ui", "graphics", "interface", "futures"]
+categories = ["gui"]
+
+[dependencies]
+log = "0.4"
+
+[dependencies.futures]
+version = "0.3"
+features = ["thread-pool"]
+
+[dependencies.tokio]
+version = "0.2"
+optional = true
+features = ["rt-core"]
diff --git a/futures/src/command.rs b/futures/src/command.rs
new file mode 100644
index 00000000..e7885fb8
--- /dev/null
+++ b/futures/src/command.rs
@@ -0,0 +1,100 @@
+use futures::future::{BoxFuture, Future, FutureExt};
+
+/// A collection of async operations.
+///
+/// You should be able to turn a future easily into a [`Command`], either by
+/// using the `From` trait or [`Command::perform`].
+///
+/// [`Command`]: struct.Command.html
+pub struct Command<T> {
+ futures: Vec<BoxFuture<'static, T>>,
+}
+
+impl<T> Command<T> {
+ /// Creates an empty [`Command`].
+ ///
+ /// In other words, a [`Command`] that does nothing.
+ ///
+ /// [`Command`]: struct.Command.html
+ pub fn none() -> Self {
+ Self {
+ futures: Vec::new(),
+ }
+ }
+
+ /// Creates a [`Command`] that performs the action of the given future.
+ ///
+ /// [`Command`]: struct.Command.html
+ pub fn perform<A>(
+ future: impl Future<Output = T> + 'static + Send,
+ f: impl Fn(T) -> A + 'static + Send,
+ ) -> Command<A> {
+ Command {
+ futures: vec![future.map(f).boxed()],
+ }
+ }
+
+ /// Applies a transformation to the result of a [`Command`].
+ ///
+ /// [`Command`]: struct.Command.html
+ pub fn map<A>(
+ mut self,
+ f: impl Fn(T) -> A + 'static + Send + Sync,
+ ) -> Command<A>
+ where
+ T: 'static,
+ {
+ let f = std::sync::Arc::new(f);
+
+ Command {
+ futures: self
+ .futures
+ .drain(..)
+ .map(|future| {
+ let f = f.clone();
+
+ future.map(move |result| f(result)).boxed()
+ })
+ .collect(),
+ }
+ }
+
+ /// Creates a [`Command`] that performs the actions of all the given
+ /// commands.
+ ///
+ /// Once this command is run, all the commands will be exectued at once.
+ ///
+ /// [`Command`]: struct.Command.html
+ pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self {
+ Self {
+ futures: commands
+ .into_iter()
+ .flat_map(|command| command.futures)
+ .collect(),
+ }
+ }
+
+ /// Converts a [`Command`] into its underlying list of futures.
+ ///
+ /// [`Command`]: struct.Command.html
+ pub fn futures(self) -> Vec<BoxFuture<'static, T>> {
+ self.futures
+ }
+}
+
+impl<T, A> From<A> for Command<T>
+where
+ A: Future<Output = T> + 'static + Send,
+{
+ fn from(future: A) -> Self {
+ Self {
+ futures: vec![future.boxed()],
+ }
+ }
+}
+
+impl<T> std::fmt::Debug for Command<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Command").finish()
+ }
+}
diff --git a/futures/src/lib.rs b/futures/src/lib.rs
new file mode 100644
index 00000000..f6bcf85a
--- /dev/null
+++ b/futures/src/lib.rs
@@ -0,0 +1,8 @@
+mod command;
+
+pub mod runtime;
+pub mod subscription;
+
+pub use command::Command;
+pub use runtime::Runtime;
+pub use subscription::Subscription;
diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs
new file mode 100644
index 00000000..bc1ad8ac
--- /dev/null
+++ b/futures/src/runtime.rs
@@ -0,0 +1,79 @@
+//! Run commands and subscriptions.
+mod executor;
+
+pub use executor::Executor;
+
+use crate::{subscription, Command, Subscription};
+
+use futures::Sink;
+use std::marker::PhantomData;
+
+#[derive(Debug)]
+pub struct Runtime<Hasher, Event, Executor, Receiver, Message> {
+ executor: Executor,
+ receiver: Receiver,
+ subscriptions: subscription::Tracker<Hasher, Event>,
+ _message: PhantomData<Message>,
+}
+
+impl<Hasher, Event, Executor, Receiver, Message>
+ Runtime<Hasher, Event, Executor, Receiver, Message>
+where
+ Hasher: std::hash::Hasher + Default,
+ Event: Send + Clone + 'static,
+ Executor: self::Executor,
+ Receiver: Sink<Message, Error = core::convert::Infallible>
+ + Unpin
+ + Send
+ + Clone
+ + 'static,
+ Message: Send + 'static,
+{
+ pub fn new(executor: Executor, receiver: Receiver) -> Self {
+ Self {
+ executor,
+ receiver,
+ subscriptions: subscription::Tracker::new(),
+ _message: PhantomData,
+ }
+ }
+
+ pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
+ self.executor.enter(f)
+ }
+
+ pub fn spawn(&mut self, command: Command<Message>) {
+ use futures::{FutureExt, SinkExt};
+
+ let futures = command.futures();
+
+ for future in futures {
+ let mut receiver = self.receiver.clone();
+
+ self.executor.spawn(future.then(|message| {
+ async move {
+ let _ = receiver.send(message).await;
+
+ ()
+ }
+ }));
+ }
+ }
+
+ pub fn track(
+ &mut self,
+ subscription: Subscription<Hasher, Event, Message>,
+ ) {
+ let futures = self
+ .subscriptions
+ .update(subscription, self.receiver.clone());
+
+ for future in futures {
+ self.executor.spawn(future);
+ }
+ }
+
+ pub fn broadcast(&mut self, event: Event) {
+ self.subscriptions.broadcast(event);
+ }
+}
diff --git a/futures/src/runtime/executor.rs b/futures/src/runtime/executor.rs
new file mode 100644
index 00000000..855aa105
--- /dev/null
+++ b/futures/src/runtime/executor.rs
@@ -0,0 +1,26 @@
+use futures::Future;
+
+pub trait Executor {
+ fn spawn(&self, future: impl Future<Output = ()> + Send + 'static);
+
+ fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
+ f()
+ }
+}
+
+impl Executor for futures::executor::ThreadPool {
+ fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
+ self.spawn_ok(future);
+ }
+}
+
+#[cfg(feature = "tokio")]
+impl Executor for tokio::runtime::Runtime {
+ fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
+ let _ = tokio::runtime::Runtime::spawn(self, future);
+ }
+
+ fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
+ tokio::runtime::Runtime::enter(self, f)
+ }
+}
diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
new file mode 100644
index 00000000..87e51e48
--- /dev/null
+++ b/futures/src/subscription.rs
@@ -0,0 +1,188 @@
+//! Listen to external events in your application.
+mod tracker;
+
+pub use tracker::Tracker;
+
+use futures::stream::BoxStream;
+
+/// A request to listen to external events.
+///
+/// Besides performing async actions on demand with [`Command`], most
+/// applications also need to listen to external events passively.
+///
+/// A [`Subscription`] is normally provided to some runtime, like a [`Command`],
+/// and it will generate events as long as the user keeps requesting it.
+///
+/// For instance, you can use a [`Subscription`] to listen to a WebSocket
+/// connection, keyboard presses, mouse events, time ticks, etc.
+///
+/// This type is normally aliased by runtimes with a specific `Input` and/or
+/// `Hasher`.
+///
+/// [`Command`]: ../struct.Command.html
+/// [`Subscription`]: struct.Subscription.html
+pub struct Subscription<Hasher, Input, Output> {
+ recipes: Vec<Box<dyn Recipe<Hasher, Input, Output = Output>>>,
+}
+
+impl<H, I, O> Subscription<H, I, O>
+where
+ H: std::hash::Hasher,
+{
+ /// Returns an empty [`Subscription`] that will not produce any output.
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ pub fn none() -> Self {
+ Self {
+ recipes: Vec::new(),
+ }
+ }
+
+ /// Creates a [`Subscription`] from a [`Recipe`] describing it.
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ /// [`Recipe`]: trait.Recipe.html
+ pub fn from_recipe(
+ recipe: impl Recipe<H, I, Output = O> + 'static,
+ ) -> Self {
+ Self {
+ recipes: vec![Box::new(recipe)],
+ }
+ }
+
+ /// Batches all the provided subscriptions and returns the resulting
+ /// [`Subscription`].
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ pub fn batch(
+ subscriptions: impl IntoIterator<Item = Subscription<H, I, O>>,
+ ) -> Self {
+ Self {
+ recipes: subscriptions
+ .into_iter()
+ .flat_map(|subscription| subscription.recipes)
+ .collect(),
+ }
+ }
+
+ /// Returns the different recipes of the [`Subscription`].
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ pub fn recipes(self) -> Vec<Box<dyn Recipe<H, I, Output = O>>> {
+ self.recipes
+ }
+
+ /// Transforms the [`Subscription`] output with the given function.
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ pub fn map<A>(
+ mut self,
+ f: impl Fn(O) -> A + Send + Sync + 'static,
+ ) -> Subscription<H, I, A>
+ where
+ H: 'static,
+ I: 'static,
+ O: 'static,
+ A: 'static,
+ {
+ let function = std::sync::Arc::new(f);
+
+ Subscription {
+ recipes: self
+ .recipes
+ .drain(..)
+ .map(|recipe| {
+ Box::new(Map::new(recipe, function.clone()))
+ as Box<dyn Recipe<H, I, Output = A>>
+ })
+ .collect(),
+ }
+ }
+}
+
+impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Subscription").finish()
+ }
+}
+
+/// The description of a [`Subscription`].
+///
+/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
+/// by runtimes to run and identify subscriptions. You can use it to create your
+/// own!
+///
+/// [`Subscription`]: struct.Subscription.html
+/// [`Recipe`]: trait.Recipe.html
+pub trait Recipe<Hasher: std::hash::Hasher, Input> {
+ /// The events that will be produced by a [`Subscription`] with this
+ /// [`Recipe`].
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ /// [`Recipe`]: trait.Recipe.html
+ type Output;
+
+ /// Hashes the [`Recipe`].
+ ///
+ /// This is used by runtimes to uniquely identify a [`Subscription`].
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ /// [`Recipe`]: trait.Recipe.html
+ fn hash(&self, state: &mut Hasher);
+
+ /// Executes the [`Recipe`] and produces the stream of events of its
+ /// [`Subscription`].
+ ///
+ /// It receives some generic `Input`, which is normally defined by runtimes.
+ ///
+ /// [`Subscription`]: struct.Subscription.html
+ /// [`Recipe`]: trait.Recipe.html
+ fn stream(
+ self: Box<Self>,
+ input: BoxStream<'static, Input>,
+ ) -> BoxStream<'static, Self::Output>;
+}
+
+struct Map<Hasher, Input, A, B> {
+ recipe: Box<dyn Recipe<Hasher, Input, Output = A>>,
+ mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>,
+}
+
+impl<H, I, A, B> Map<H, I, A, B> {
+ fn new(
+ recipe: Box<dyn Recipe<H, I, Output = A>>,
+ mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>,
+ ) -> Self {
+ Map { recipe, mapper }
+ }
+}
+
+impl<H, I, A, B> Recipe<H, I> for Map<H, I, A, B>
+where
+ A: 'static,
+ B: 'static,
+ H: std::hash::Hasher,
+{
+ type Output = B;
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+
+ std::any::TypeId::of::<B>().hash(state);
+ self.recipe.hash(state);
+ }
+
+ fn stream(
+ self: Box<Self>,
+ input: BoxStream<'static, I>,
+ ) -> futures::stream::BoxStream<'static, Self::Output> {
+ use futures::StreamExt;
+
+ let mapper = self.mapper;
+
+ self.recipe
+ .stream(input)
+ .map(move |element| mapper(element))
+ .boxed()
+ }
+}
diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs
new file mode 100644
index 00000000..a942b619
--- /dev/null
+++ b/futures/src/subscription/tracker.rs
@@ -0,0 +1,112 @@
+use crate::Subscription;
+
+use futures::{future::BoxFuture, sink::Sink};
+use std::collections::HashMap;
+use std::marker::PhantomData;
+
+#[derive(Debug)]
+pub struct Tracker<Hasher, Event> {
+ subscriptions: HashMap<u64, Execution<Event>>,
+ _hasher: PhantomData<Hasher>,
+}
+
+#[derive(Debug)]
+pub struct Execution<Event> {
+ _cancel: futures::channel::oneshot::Sender<()>,
+ listener: Option<futures::channel::mpsc::Sender<Event>>,
+}
+
+impl<Hasher, Event> Tracker<Hasher, Event>
+where
+ Hasher: std::hash::Hasher + Default,
+ Event: 'static + Send + Clone,
+{
+ pub fn new() -> Self {
+ Self {
+ subscriptions: HashMap::new(),
+ _hasher: PhantomData,
+ }
+ }
+
+ pub fn update<Message, Receiver>(
+ &mut self,
+ subscription: Subscription<Hasher, Event, Message>,
+ receiver: Receiver,
+ ) -> Vec<BoxFuture<'static, ()>>
+ where
+ Message: 'static + Send,
+ Receiver: 'static
+ + Sink<Message, Error = core::convert::Infallible>
+ + Unpin
+ + Send
+ + Clone,
+ {
+ use futures::{future::FutureExt, stream::StreamExt};
+
+ let mut futures = Vec::new();
+
+ let recipes = subscription.recipes();
+ let mut alive = std::collections::HashSet::new();
+
+ for recipe in recipes {
+ let id = {
+ let mut hasher = Hasher::default();
+ recipe.hash(&mut hasher);
+
+ hasher.finish()
+ };
+
+ let _ = alive.insert(id);
+
+ if self.subscriptions.contains_key(&id) {
+ continue;
+ }
+
+ let (cancel, cancelled) = futures::channel::oneshot::channel();
+
+ // TODO: Use bus if/when it supports async
+ let (event_sender, event_receiver) =
+ futures::channel::mpsc::channel(100);
+
+ let stream = recipe.stream(event_receiver.boxed());
+
+ let future = futures::future::select(
+ cancelled,
+ stream.map(Ok).forward(receiver.clone()),
+ )
+ .map(|_| ());
+
+ let _ = self.subscriptions.insert(
+ id,
+ Execution {
+ _cancel: cancel,
+ listener: if event_sender.is_closed() {
+ None
+ } else {
+ Some(event_sender)
+ },
+ },
+ );
+
+ futures.push(future.boxed());
+ }
+
+ self.subscriptions.retain(|id, _| alive.contains(&id));
+
+ futures
+ }
+
+ pub fn broadcast(&mut self, event: Event) {
+ self.subscriptions
+ .values_mut()
+ .filter_map(|connection| connection.listener.as_mut())
+ .for_each(|listener| {
+ if let Err(error) = listener.try_send(event.clone()) {
+ log::error!(
+ "Error sending event to subscription: {:?}",
+ error
+ );
+ }
+ });
+ }
+}