From b5b17ed4d800c03beb3ad535d1069a7784e8dc1d Mon Sep 17 00:00:00 2001
From: Héctor Ramón Jiménez <hector0193@gmail.com>
Date: Sun, 19 Jan 2020 10:17:08 +0100
Subject: Create `iced_futures` and wire everything up

---
 futures/src/command.rs              | 100 +++++++++++++++++++
 futures/src/lib.rs                  |   8 ++
 futures/src/runtime.rs              |  79 +++++++++++++++
 futures/src/runtime/executor.rs     |  26 +++++
 futures/src/subscription.rs         | 188 ++++++++++++++++++++++++++++++++++++
 futures/src/subscription/tracker.rs | 112 +++++++++++++++++++++
 6 files changed, 513 insertions(+)
 create mode 100644 futures/src/command.rs
 create mode 100644 futures/src/lib.rs
 create mode 100644 futures/src/runtime.rs
 create mode 100644 futures/src/runtime/executor.rs
 create mode 100644 futures/src/subscription.rs
 create mode 100644 futures/src/subscription/tracker.rs

(limited to 'futures/src')

diff --git a/futures/src/command.rs b/futures/src/command.rs
new file mode 100644
index 00000000..e7885fb8
--- /dev/null
+++ b/futures/src/command.rs
@@ -0,0 +1,100 @@
+use futures::future::{BoxFuture, Future, FutureExt};
+
+/// A collection of async operations.
+///
+/// You should be able to turn a future easily into a [`Command`], either by
+/// using the `From` trait or [`Command::perform`].
+///
+/// [`Command`]: struct.Command.html
+pub struct Command<T> {
+    futures: Vec<BoxFuture<'static, T>>,
+}
+
+impl<T> Command<T> {
+    /// Creates an empty [`Command`].
+    ///
+    /// In other words, a [`Command`] that does nothing.
+    ///
+    /// [`Command`]: struct.Command.html
+    pub fn none() -> Self {
+        Self {
+            futures: Vec::new(),
+        }
+    }
+
+    /// Creates a [`Command`] that performs the action of the given future.
+    ///
+    /// [`Command`]: struct.Command.html
+    pub fn perform<A>(
+        future: impl Future<Output = T> + 'static + Send,
+        f: impl Fn(T) -> A + 'static + Send,
+    ) -> Command<A> {
+        Command {
+            futures: vec![future.map(f).boxed()],
+        }
+    }
+
+    /// Applies a transformation to the result of a [`Command`].
+    ///
+    /// [`Command`]: struct.Command.html
+    pub fn map<A>(
+        mut self,
+        f: impl Fn(T) -> A + 'static + Send + Sync,
+    ) -> Command<A>
+    where
+        T: 'static,
+    {
+        let f = std::sync::Arc::new(f);
+
+        Command {
+            futures: self
+                .futures
+                .drain(..)
+                .map(|future| {
+                    let f = f.clone();
+
+                    future.map(move |result| f(result)).boxed()
+                })
+                .collect(),
+        }
+    }
+
+    /// Creates a [`Command`] that performs the actions of all the given
+    /// commands.
+    ///
+    /// Once this command is run, all the commands will be exectued at once.
+    ///
+    /// [`Command`]: struct.Command.html
+    pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self {
+        Self {
+            futures: commands
+                .into_iter()
+                .flat_map(|command| command.futures)
+                .collect(),
+        }
+    }
+
+    /// Converts a [`Command`] into its underlying list of futures.
+    ///
+    /// [`Command`]: struct.Command.html
+    pub fn futures(self) -> Vec<BoxFuture<'static, T>> {
+        self.futures
+    }
+}
+
+impl<T, A> From<A> for Command<T>
+where
+    A: Future<Output = T> + 'static + Send,
+{
+    fn from(future: A) -> Self {
+        Self {
+            futures: vec![future.boxed()],
+        }
+    }
+}
+
+impl<T> std::fmt::Debug for Command<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Command").finish()
+    }
+}
diff --git a/futures/src/lib.rs b/futures/src/lib.rs
new file mode 100644
index 00000000..f6bcf85a
--- /dev/null
+++ b/futures/src/lib.rs
@@ -0,0 +1,8 @@
+mod command;
+
+pub mod runtime;
+pub mod subscription;
+
+pub use command::Command;
+pub use runtime::Runtime;
+pub use subscription::Subscription;
diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs
new file mode 100644
index 00000000..bc1ad8ac
--- /dev/null
+++ b/futures/src/runtime.rs
@@ -0,0 +1,79 @@
+//! Run commands and subscriptions.
+mod executor;
+
+pub use executor::Executor;
+
+use crate::{subscription, Command, Subscription};
+
+use futures::Sink;
+use std::marker::PhantomData;
+
+#[derive(Debug)]
+pub struct Runtime<Hasher, Event, Executor, Receiver, Message> {
+    executor: Executor,
+    receiver: Receiver,
+    subscriptions: subscription::Tracker<Hasher, Event>,
+    _message: PhantomData<Message>,
+}
+
+impl<Hasher, Event, Executor, Receiver, Message>
+    Runtime<Hasher, Event, Executor, Receiver, Message>
+where
+    Hasher: std::hash::Hasher + Default,
+    Event: Send + Clone + 'static,
+    Executor: self::Executor,
+    Receiver: Sink<Message, Error = core::convert::Infallible>
+        + Unpin
+        + Send
+        + Clone
+        + 'static,
+    Message: Send + 'static,
+{
+    pub fn new(executor: Executor, receiver: Receiver) -> Self {
+        Self {
+            executor,
+            receiver,
+            subscriptions: subscription::Tracker::new(),
+            _message: PhantomData,
+        }
+    }
+
+    pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
+        self.executor.enter(f)
+    }
+
+    pub fn spawn(&mut self, command: Command<Message>) {
+        use futures::{FutureExt, SinkExt};
+
+        let futures = command.futures();
+
+        for future in futures {
+            let mut receiver = self.receiver.clone();
+
+            self.executor.spawn(future.then(|message| {
+                async move {
+                    let _ = receiver.send(message).await;
+
+                    ()
+                }
+            }));
+        }
+    }
+
+    pub fn track(
+        &mut self,
+        subscription: Subscription<Hasher, Event, Message>,
+    ) {
+        let futures = self
+            .subscriptions
+            .update(subscription, self.receiver.clone());
+
+        for future in futures {
+            self.executor.spawn(future);
+        }
+    }
+
+    pub fn broadcast(&mut self, event: Event) {
+        self.subscriptions.broadcast(event);
+    }
+}
diff --git a/futures/src/runtime/executor.rs b/futures/src/runtime/executor.rs
new file mode 100644
index 00000000..855aa105
--- /dev/null
+++ b/futures/src/runtime/executor.rs
@@ -0,0 +1,26 @@
+use futures::Future;
+
+pub trait Executor {
+    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static);
+
+    fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
+        f()
+    }
+}
+
+impl Executor for futures::executor::ThreadPool {
+    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
+        self.spawn_ok(future);
+    }
+}
+
+#[cfg(feature = "tokio")]
+impl Executor for tokio::runtime::Runtime {
+    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
+        let _ = tokio::runtime::Runtime::spawn(self, future);
+    }
+
+    fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
+        tokio::runtime::Runtime::enter(self, f)
+    }
+}
diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
new file mode 100644
index 00000000..87e51e48
--- /dev/null
+++ b/futures/src/subscription.rs
@@ -0,0 +1,188 @@
+//! Listen to external events in your application.
+mod tracker;
+
+pub use tracker::Tracker;
+
+use futures::stream::BoxStream;
+
+/// A request to listen to external events.
+///
+/// Besides performing async actions on demand with [`Command`], most
+/// applications also need to listen to external events passively.
+///
+/// A [`Subscription`] is normally provided to some runtime, like a [`Command`],
+/// and it will generate events as long as the user keeps requesting it.
+///
+/// For instance, you can use a [`Subscription`] to listen to a WebSocket
+/// connection, keyboard presses, mouse events, time ticks, etc.
+///
+/// This type is normally aliased by runtimes with a specific `Input` and/or
+/// `Hasher`.
+///
+/// [`Command`]: ../struct.Command.html
+/// [`Subscription`]: struct.Subscription.html
+pub struct Subscription<Hasher, Input, Output> {
+    recipes: Vec<Box<dyn Recipe<Hasher, Input, Output = Output>>>,
+}
+
+impl<H, I, O> Subscription<H, I, O>
+where
+    H: std::hash::Hasher,
+{
+    /// Returns an empty [`Subscription`] that will not produce any output.
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    pub fn none() -> Self {
+        Self {
+            recipes: Vec::new(),
+        }
+    }
+
+    /// Creates a [`Subscription`] from a [`Recipe`] describing it.
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    /// [`Recipe`]: trait.Recipe.html
+    pub fn from_recipe(
+        recipe: impl Recipe<H, I, Output = O> + 'static,
+    ) -> Self {
+        Self {
+            recipes: vec![Box::new(recipe)],
+        }
+    }
+
+    /// Batches all the provided subscriptions and returns the resulting
+    /// [`Subscription`].
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    pub fn batch(
+        subscriptions: impl IntoIterator<Item = Subscription<H, I, O>>,
+    ) -> Self {
+        Self {
+            recipes: subscriptions
+                .into_iter()
+                .flat_map(|subscription| subscription.recipes)
+                .collect(),
+        }
+    }
+
+    /// Returns the different recipes of the [`Subscription`].
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    pub fn recipes(self) -> Vec<Box<dyn Recipe<H, I, Output = O>>> {
+        self.recipes
+    }
+
+    /// Transforms the [`Subscription`] output with the given function.
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    pub fn map<A>(
+        mut self,
+        f: impl Fn(O) -> A + Send + Sync + 'static,
+    ) -> Subscription<H, I, A>
+    where
+        H: 'static,
+        I: 'static,
+        O: 'static,
+        A: 'static,
+    {
+        let function = std::sync::Arc::new(f);
+
+        Subscription {
+            recipes: self
+                .recipes
+                .drain(..)
+                .map(|recipe| {
+                    Box::new(Map::new(recipe, function.clone()))
+                        as Box<dyn Recipe<H, I, Output = A>>
+                })
+                .collect(),
+        }
+    }
+}
+
+impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Subscription").finish()
+    }
+}
+
+/// The description of a [`Subscription`].
+///
+/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
+/// by runtimes to run and identify subscriptions. You can use it to create your
+/// own!
+///
+/// [`Subscription`]: struct.Subscription.html
+/// [`Recipe`]: trait.Recipe.html
+pub trait Recipe<Hasher: std::hash::Hasher, Input> {
+    /// The events that will be produced by a [`Subscription`] with this
+    /// [`Recipe`].
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    /// [`Recipe`]: trait.Recipe.html
+    type Output;
+
+    /// Hashes the [`Recipe`].
+    ///
+    /// This is used by runtimes to uniquely identify a [`Subscription`].
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    /// [`Recipe`]: trait.Recipe.html
+    fn hash(&self, state: &mut Hasher);
+
+    /// Executes the [`Recipe`] and produces the stream of events of its
+    /// [`Subscription`].
+    ///
+    /// It receives some generic `Input`, which is normally defined by runtimes.
+    ///
+    /// [`Subscription`]: struct.Subscription.html
+    /// [`Recipe`]: trait.Recipe.html
+    fn stream(
+        self: Box<Self>,
+        input: BoxStream<'static, Input>,
+    ) -> BoxStream<'static, Self::Output>;
+}
+
+struct Map<Hasher, Input, A, B> {
+    recipe: Box<dyn Recipe<Hasher, Input, Output = A>>,
+    mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>,
+}
+
+impl<H, I, A, B> Map<H, I, A, B> {
+    fn new(
+        recipe: Box<dyn Recipe<H, I, Output = A>>,
+        mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>,
+    ) -> Self {
+        Map { recipe, mapper }
+    }
+}
+
+impl<H, I, A, B> Recipe<H, I> for Map<H, I, A, B>
+where
+    A: 'static,
+    B: 'static,
+    H: std::hash::Hasher,
+{
+    type Output = B;
+
+    fn hash(&self, state: &mut H) {
+        use std::hash::Hash;
+
+        std::any::TypeId::of::<B>().hash(state);
+        self.recipe.hash(state);
+    }
+
+    fn stream(
+        self: Box<Self>,
+        input: BoxStream<'static, I>,
+    ) -> futures::stream::BoxStream<'static, Self::Output> {
+        use futures::StreamExt;
+
+        let mapper = self.mapper;
+
+        self.recipe
+            .stream(input)
+            .map(move |element| mapper(element))
+            .boxed()
+    }
+}
diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs
new file mode 100644
index 00000000..a942b619
--- /dev/null
+++ b/futures/src/subscription/tracker.rs
@@ -0,0 +1,112 @@
+use crate::Subscription;
+
+use futures::{future::BoxFuture, sink::Sink};
+use std::collections::HashMap;
+use std::marker::PhantomData;
+
+#[derive(Debug)]
+pub struct Tracker<Hasher, Event> {
+    subscriptions: HashMap<u64, Execution<Event>>,
+    _hasher: PhantomData<Hasher>,
+}
+
+#[derive(Debug)]
+pub struct Execution<Event> {
+    _cancel: futures::channel::oneshot::Sender<()>,
+    listener: Option<futures::channel::mpsc::Sender<Event>>,
+}
+
+impl<Hasher, Event> Tracker<Hasher, Event>
+where
+    Hasher: std::hash::Hasher + Default,
+    Event: 'static + Send + Clone,
+{
+    pub fn new() -> Self {
+        Self {
+            subscriptions: HashMap::new(),
+            _hasher: PhantomData,
+        }
+    }
+
+    pub fn update<Message, Receiver>(
+        &mut self,
+        subscription: Subscription<Hasher, Event, Message>,
+        receiver: Receiver,
+    ) -> Vec<BoxFuture<'static, ()>>
+    where
+        Message: 'static + Send,
+        Receiver: 'static
+            + Sink<Message, Error = core::convert::Infallible>
+            + Unpin
+            + Send
+            + Clone,
+    {
+        use futures::{future::FutureExt, stream::StreamExt};
+
+        let mut futures = Vec::new();
+
+        let recipes = subscription.recipes();
+        let mut alive = std::collections::HashSet::new();
+
+        for recipe in recipes {
+            let id = {
+                let mut hasher = Hasher::default();
+                recipe.hash(&mut hasher);
+
+                hasher.finish()
+            };
+
+            let _ = alive.insert(id);
+
+            if self.subscriptions.contains_key(&id) {
+                continue;
+            }
+
+            let (cancel, cancelled) = futures::channel::oneshot::channel();
+
+            // TODO: Use bus if/when it supports async
+            let (event_sender, event_receiver) =
+                futures::channel::mpsc::channel(100);
+
+            let stream = recipe.stream(event_receiver.boxed());
+
+            let future = futures::future::select(
+                cancelled,
+                stream.map(Ok).forward(receiver.clone()),
+            )
+            .map(|_| ());
+
+            let _ = self.subscriptions.insert(
+                id,
+                Execution {
+                    _cancel: cancel,
+                    listener: if event_sender.is_closed() {
+                        None
+                    } else {
+                        Some(event_sender)
+                    },
+                },
+            );
+
+            futures.push(future.boxed());
+        }
+
+        self.subscriptions.retain(|id, _| alive.contains(&id));
+
+        futures
+    }
+
+    pub fn broadcast(&mut self, event: Event) {
+        self.subscriptions
+            .values_mut()
+            .filter_map(|connection| connection.listener.as_mut())
+            .for_each(|listener| {
+                if let Err(error) = listener.try_send(event.clone()) {
+                    log::error!(
+                        "Error sending event to subscription: {:?}",
+                        error
+                    );
+                }
+            });
+    }
+}
-- 
cgit