diff options
Diffstat (limited to '')
| -rw-r--r-- | futures/src/command.rs (renamed from core/src/command.rs) | 0 | ||||
| -rw-r--r-- | futures/src/executor.rs | 55 | ||||
| -rw-r--r-- | futures/src/executor/async_std.rs | 17 | ||||
| -rw-r--r-- | futures/src/executor/null.rs | 15 | ||||
| -rw-r--r-- | futures/src/executor/thread_pool.rs | 16 | ||||
| -rw-r--r-- | futures/src/executor/tokio.rs | 20 | ||||
| -rw-r--r-- | futures/src/executor/wasm_bindgen.rs | 18 | ||||
| -rw-r--r-- | futures/src/lib.rs | 18 | ||||
| -rw-r--r-- | futures/src/runtime.rs | 119 | ||||
| -rw-r--r-- | futures/src/subscription.rs (renamed from core/src/subscription.rs) | 46 | ||||
| -rw-r--r-- | futures/src/subscription/tracker.rs | 148 | 
11 files changed, 452 insertions, 20 deletions
diff --git a/core/src/command.rs b/futures/src/command.rs index e7885fb8..e7885fb8 100644 --- a/core/src/command.rs +++ b/futures/src/command.rs diff --git a/futures/src/executor.rs b/futures/src/executor.rs new file mode 100644 index 00000000..c2b9cc72 --- /dev/null +++ b/futures/src/executor.rs @@ -0,0 +1,55 @@ +//! Choose your preferred executor to power a runtime. +mod null; + +#[cfg(feature = "thread-pool")] +mod thread_pool; + +#[cfg(feature = "tokio")] +mod tokio; + +#[cfg(feature = "async-std")] +mod async_std; + +#[cfg(target_arch = "wasm32")] +mod wasm_bindgen; + +pub use null::Null; + +#[cfg(feature = "thread-pool")] +pub use thread_pool::ThreadPool; + +#[cfg(feature = "tokio")] +pub use self::tokio::Tokio; + +#[cfg(feature = "async-std")] +pub use self::async_std::AsyncStd; + +#[cfg(target_arch = "wasm32")] +pub use wasm_bindgen::WasmBindgen; + +use futures::Future; + +/// A type that can run futures. +pub trait Executor: Sized { +    /// Creates a new [`Executor`]. +    /// +    /// [`Executor`]: trait.Executor.html +    fn new() -> Result<Self, futures::io::Error> +    where +        Self: Sized; + +    /// Spawns a future in the [`Executor`]. +    /// +    /// [`Executor`]: trait.Executor.html +    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static); + +    /// Runs the given closure inside the [`Executor`]. +    /// +    /// Some executors, like `tokio`, require some global state to be in place +    /// before creating futures. This method can be leveraged to set up this +    /// global state, call a function, restore the state, and obtain the result +    /// of the call. +    fn enter<R>(&self, f: impl FnOnce() -> R) -> R { +        f() +    } +} diff --git a/futures/src/executor/async_std.rs b/futures/src/executor/async_std.rs new file mode 100644 index 00000000..27949e31 --- /dev/null +++ b/futures/src/executor/async_std.rs @@ -0,0 +1,17 @@ +use crate::Executor; + +use futures::Future; + +/// An `async-std` runtime. +#[derive(Debug)] +pub struct AsyncStd; + +impl Executor for AsyncStd { +    fn new() -> Result<Self, futures::io::Error> { +        Ok(Self) +    } + +    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { +        let _ = async_std::task::spawn(future); +    } +} diff --git a/futures/src/executor/null.rs b/futures/src/executor/null.rs new file mode 100644 index 00000000..6d5cf982 --- /dev/null +++ b/futures/src/executor/null.rs @@ -0,0 +1,15 @@ +use crate::Executor; + +use futures::Future; + +/// An executor that drops all the futures, instead of spawning them. +#[derive(Debug)] +pub struct Null; + +impl Executor for Null { +    fn new() -> Result<Self, futures::io::Error> { +        Ok(Self) +    } + +    fn spawn(&self, _future: impl Future<Output = ()> + Send + 'static) {} +} diff --git a/futures/src/executor/thread_pool.rs b/futures/src/executor/thread_pool.rs new file mode 100644 index 00000000..1ec5bf69 --- /dev/null +++ b/futures/src/executor/thread_pool.rs @@ -0,0 +1,16 @@ +use crate::Executor; + +use futures::Future; + +/// A thread pool runtime for futures. +pub type ThreadPool = futures::executor::ThreadPool; + +impl Executor for futures::executor::ThreadPool { +    fn new() -> Result<Self, futures::io::Error> { +        futures::executor::ThreadPool::new() +    } + +    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { +        self.spawn_ok(future); +    } +} diff --git a/futures/src/executor/tokio.rs b/futures/src/executor/tokio.rs new file mode 100644 index 00000000..20802ceb --- /dev/null +++ b/futures/src/executor/tokio.rs @@ -0,0 +1,20 @@ +use crate::Executor; + +use futures::Future; + +/// A `tokio` runtime. +pub type Tokio = tokio::runtime::Runtime; + +impl Executor for Tokio { +    fn new() -> Result<Self, futures::io::Error> { +        tokio::runtime::Runtime::new() +    } + +    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { +        let _ = tokio::runtime::Runtime::spawn(self, future); +    } + +    fn enter<R>(&self, f: impl FnOnce() -> R) -> R { +        tokio::runtime::Runtime::enter(self, f) +    } +} diff --git a/futures/src/executor/wasm_bindgen.rs b/futures/src/executor/wasm_bindgen.rs new file mode 100644 index 00000000..69b7c7e2 --- /dev/null +++ b/futures/src/executor/wasm_bindgen.rs @@ -0,0 +1,18 @@ +use crate::Executor; + +/// A `wasm-bindgen-futures` runtime. +#[derive(Debug)] +pub struct WasmBindgen; + +impl Executor for WasmBindgen { +    fn new() -> Result<Self, futures::io::Error> { +        Ok(Self) +    } + +    fn spawn( +        &self, +        future: impl futures::Future<Output = ()> + Send + 'static, +    ) { +        wasm_bindgen_futures::spawn_local(future); +    } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs new file mode 100644 index 00000000..4872df10 --- /dev/null +++ b/futures/src/lib.rs @@ -0,0 +1,18 @@ +//! Asynchronous tasks for GUI programming, inspired by Elm. +#![deny(missing_docs)] +#![deny(missing_debug_implementations)] +#![deny(unused_results)] +#![deny(unsafe_code)] +#![deny(rust_2018_idioms)] +pub use futures; + +mod command; +mod runtime; + +pub mod executor; +pub mod subscription; + +pub use command::Command; +pub use executor::Executor; +pub use runtime::Runtime; +pub use subscription::Subscription; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs new file mode 100644 index 00000000..9fd9899a --- /dev/null +++ b/futures/src/runtime.rs @@ -0,0 +1,119 @@ +//! Run commands and keep track of subscriptions. +use crate::{subscription, Command, Executor, Subscription}; + +use futures::Sink; +use std::marker::PhantomData; + +/// A batteries-included runtime of commands and subscriptions. +/// +/// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any +/// [`Command`] or [`Subscription`] and get notified of the results! +/// +/// [`Runtime`]: struct.Runtime.html +/// [`Executor`]: executor/trait.Executor.html +/// [`Command`]: struct.Command.html +/// [`Subscription`]: subscription/struct.Subscription.html +#[derive(Debug)] +pub struct Runtime<Hasher, Event, Executor, Sender, Message> { +    executor: Executor, +    sender: Sender, +    subscriptions: subscription::Tracker<Hasher, Event>, +    _message: PhantomData<Message>, +} + +impl<Hasher, Event, Executor, Sender, Message> +    Runtime<Hasher, Event, Executor, Sender, Message> +where +    Hasher: std::hash::Hasher + Default, +    Event: Send + Clone + 'static, +    Executor: self::Executor, +    Sender: Sink<Message, Error = core::convert::Infallible> +        + Unpin +        + Send +        + Clone +        + 'static, +    Message: Send + 'static, +{ +    /// Creates a new empty [`Runtime`]. +    /// +    /// You need to provide: +    /// - an [`Executor`] to spawn futures +    /// - a `Sender` implementing `Sink` to receive the results +    /// +    /// [`Runtime`]: struct.Runtime.html +    pub fn new(executor: Executor, sender: Sender) -> Self { +        Self { +            executor, +            sender, +            subscriptions: subscription::Tracker::new(), +            _message: PhantomData, +        } +    } + +    /// Runs the given closure inside the [`Executor`] of the [`Runtime`]. +    /// +    /// See [`Executor::enter`] to learn more. +    /// +    /// [`Executor`]: executor/trait.Executor.html +    /// [`Runtime`]: struct.Runtime.html +    /// [`Executor::enter`]: executor/trait.Executor.html#method.enter +    pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R { +        self.executor.enter(f) +    } + +    /// Spawns a [`Command`] in the [`Runtime`]. +    /// +    /// The resulting `Message` will be forwarded to the `Sender` of the +    /// [`Runtime`]. +    /// +    /// [`Command`]: struct.Command.html +    /// [`Runtime`]: struct.Runtime.html +    pub fn spawn(&mut self, command: Command<Message>) { +        use futures::{FutureExt, SinkExt}; + +        let futures = command.futures(); + +        for future in futures { +            let mut sender = self.sender.clone(); + +            self.executor.spawn(future.then(|message| { +                async move { +                    let _ = sender.send(message).await; + +                    () +                } +            })); +        } +    } + +    /// Tracks a [`Subscription`] in the [`Runtime`]. +    /// +    /// It will spawn new streams or close old ones as necessary! See +    /// [`Tracker::update`] to learn more about this! +    /// +    /// [`Subscription`]: subscription/struct.Subscription.html +    /// [`Runtime`]: struct.Runtime.html +    /// [`Tracker::update`]: subscription/struct.Tracker.html#method.update +    pub fn track( +        &mut self, +        subscription: Subscription<Hasher, Event, Message>, +    ) { +        let futures = +            self.subscriptions.update(subscription, self.sender.clone()); + +        for future in futures { +            self.executor.spawn(future); +        } +    } + +    /// Broadcasts an event to all the subscriptions currently alive in the +    /// [`Runtime`]. +    /// +    /// See [`Tracker::broadcast`] to learn more. +    /// +    /// [`Runtime`]: struct.Runtime.html +    /// [`Tracker::broadcast`]: subscription/struct.Tracker.html#method.broadcast +    pub fn broadcast(&mut self, event: Event) { +        self.subscriptions.broadcast(event); +    } +} diff --git a/core/src/subscription.rs b/futures/src/subscription.rs index d9e7e388..b68444cd 100644 --- a/core/src/subscription.rs +++ b/futures/src/subscription.rs @@ -1,4 +1,9 @@  //! Listen to external events in your application. +mod tracker; + +pub use tracker::Tracker; + +use futures::stream::BoxStream;  /// A request to listen to external events.  /// @@ -11,16 +16,16 @@  /// For instance, you can use a [`Subscription`] to listen to a WebSocket  /// connection, keyboard presses, mouse events, time ticks, etc.  /// -/// This type is normally aliased by runtimes with a specific `Input` and/or +/// This type is normally aliased by runtimes with a specific `Event` and/or  /// `Hasher`.  ///  /// [`Command`]: ../struct.Command.html  /// [`Subscription`]: struct.Subscription.html -pub struct Subscription<Hasher, Input, Output> { -    recipes: Vec<Box<dyn Recipe<Hasher, Input, Output = Output>>>, +pub struct Subscription<Hasher, Event, Output> { +    recipes: Vec<Box<dyn Recipe<Hasher, Event, Output = Output>>>,  } -impl<H, I, O> Subscription<H, I, O> +impl<H, E, O> Subscription<H, E, O>  where      H: std::hash::Hasher,  { @@ -38,7 +43,7 @@ where      /// [`Subscription`]: struct.Subscription.html      /// [`Recipe`]: trait.Recipe.html      pub fn from_recipe( -        recipe: impl Recipe<H, I, Output = O> + 'static, +        recipe: impl Recipe<H, E, Output = O> + 'static,      ) -> Self {          Self {              recipes: vec![Box::new(recipe)], @@ -50,7 +55,7 @@ where      ///      /// [`Subscription`]: struct.Subscription.html      pub fn batch( -        subscriptions: impl IntoIterator<Item = Subscription<H, I, O>>, +        subscriptions: impl IntoIterator<Item = Subscription<H, E, O>>,      ) -> Self {          Self {              recipes: subscriptions @@ -63,7 +68,7 @@ where      /// Returns the different recipes of the [`Subscription`].      ///      /// [`Subscription`]: struct.Subscription.html -    pub fn recipes(self) -> Vec<Box<dyn Recipe<H, I, Output = O>>> { +    pub fn recipes(self) -> Vec<Box<dyn Recipe<H, E, Output = O>>> {          self.recipes      } @@ -73,10 +78,10 @@ where      pub fn map<A>(          mut self,          f: impl Fn(O) -> A + Send + Sync + 'static, -    ) -> Subscription<H, I, A> +    ) -> Subscription<H, E, A>      where          H: 'static, -        I: 'static, +        E: 'static,          O: 'static,          A: 'static,      { @@ -88,7 +93,7 @@ where                  .drain(..)                  .map(|recipe| {                      Box::new(Map::new(recipe, function.clone())) -                        as Box<dyn Recipe<H, I, Output = A>> +                        as Box<dyn Recipe<H, E, Output = A>>                  })                  .collect(),          } @@ -109,7 +114,7 @@ impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> {  ///  /// [`Subscription`]: struct.Subscription.html  /// [`Recipe`]: trait.Recipe.html -pub trait Recipe<Hasher: std::hash::Hasher, Input> { +pub trait Recipe<Hasher: std::hash::Hasher, Event> {      /// The events that will be produced by a [`Subscription`] with this      /// [`Recipe`].      /// @@ -128,31 +133,32 @@ pub trait Recipe<Hasher: std::hash::Hasher, Input> {      /// Executes the [`Recipe`] and produces the stream of events of its      /// [`Subscription`].      /// -    /// It receives some generic `Input`, which is normally defined by runtimes. +    /// It receives some stream of generic events, which is normally defined by +    /// shells.      ///      /// [`Subscription`]: struct.Subscription.html      /// [`Recipe`]: trait.Recipe.html      fn stream(          self: Box<Self>, -        input: Input, -    ) -> futures::stream::BoxStream<'static, Self::Output>; +        input: BoxStream<'static, Event>, +    ) -> BoxStream<'static, Self::Output>;  } -struct Map<Hasher, Input, A, B> { -    recipe: Box<dyn Recipe<Hasher, Input, Output = A>>, +struct Map<Hasher, Event, A, B> { +    recipe: Box<dyn Recipe<Hasher, Event, Output = A>>,      mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>,  } -impl<H, I, A, B> Map<H, I, A, B> { +impl<H, E, A, B> Map<H, E, A, B> {      fn new( -        recipe: Box<dyn Recipe<H, I, Output = A>>, +        recipe: Box<dyn Recipe<H, E, Output = A>>,          mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>,      ) -> Self {          Map { recipe, mapper }      }  } -impl<H, I, A, B> Recipe<H, I> for Map<H, I, A, B> +impl<H, E, A, B> Recipe<H, E> for Map<H, E, A, B>  where      A: 'static,      B: 'static, @@ -169,7 +175,7 @@ where      fn stream(          self: Box<Self>, -        input: I, +        input: BoxStream<'static, E>,      ) -> futures::stream::BoxStream<'static, Self::Output> {          use futures::StreamExt; diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs new file mode 100644 index 00000000..c8a1ee18 --- /dev/null +++ b/futures/src/subscription/tracker.rs @@ -0,0 +1,148 @@ +use crate::Subscription; + +use futures::{future::BoxFuture, sink::Sink}; +use std::collections::HashMap; +use std::marker::PhantomData; + +/// A registry of subscription streams. +/// +/// If you have an application that continuously returns a [`Subscription`], +/// you can use a [`Tracker`] to keep track of the different recipes and keep +/// its executions alive. +#[derive(Debug)] +pub struct Tracker<Hasher, Event> { +    subscriptions: HashMap<u64, Execution<Event>>, +    _hasher: PhantomData<Hasher>, +} + +#[derive(Debug)] +pub struct Execution<Event> { +    _cancel: futures::channel::oneshot::Sender<()>, +    listener: Option<futures::channel::mpsc::Sender<Event>>, +} + +impl<Hasher, Event> Tracker<Hasher, Event> +where +    Hasher: std::hash::Hasher + Default, +    Event: 'static + Send + Clone, +{ +    /// Creates a new empty [`Tracker`]. +    /// +    /// [`Tracker`]: struct.Tracker.html +    pub fn new() -> Self { +        Self { +            subscriptions: HashMap::new(), +            _hasher: PhantomData, +        } +    } + +    /// Updates the [`Tracker`] with the given [`Subscription`]. +    /// +    /// A [`Subscription`] can cause new streams to be spawned or old streams +    /// to be closed. +    /// +    /// The [`Tracker`] keeps track of these streams between calls to this +    /// method: +    /// +    /// - If the provided [`Subscription`] contains a new [`Recipe`] that is +    /// currently not being run, it will spawn a new stream and keep it alive. +    /// - On the other hand, if a [`Recipe`] is currently in execution and the +    /// provided [`Subscription`] does not contain it anymore, then the +    /// [`Tracker`] will close and drop the relevant stream. +    /// +    /// It returns a list of futures that need to be spawned to materialize +    /// the [`Tracker`] changes. +    /// +    /// [`Tracker`]: struct.Tracker.html +    /// [`Subscription`]: struct.Subscription.html +    /// [`Recipe`]: trait.Recipe.html +    pub fn update<Message, Receiver>( +        &mut self, +        subscription: Subscription<Hasher, Event, Message>, +        receiver: Receiver, +    ) -> Vec<BoxFuture<'static, ()>> +    where +        Message: 'static + Send, +        Receiver: 'static +            + Sink<Message, Error = core::convert::Infallible> +            + Unpin +            + Send +            + Clone, +    { +        use futures::{future::FutureExt, stream::StreamExt}; + +        let mut futures = Vec::new(); + +        let recipes = subscription.recipes(); +        let mut alive = std::collections::HashSet::new(); + +        for recipe in recipes { +            let id = { +                let mut hasher = Hasher::default(); +                recipe.hash(&mut hasher); + +                hasher.finish() +            }; + +            let _ = alive.insert(id); + +            if self.subscriptions.contains_key(&id) { +                continue; +            } + +            let (cancel, cancelled) = futures::channel::oneshot::channel(); + +            // TODO: Use bus if/when it supports async +            let (event_sender, event_receiver) = +                futures::channel::mpsc::channel(100); + +            let stream = recipe.stream(event_receiver.boxed()); + +            let future = futures::future::select( +                cancelled, +                stream.map(Ok).forward(receiver.clone()), +            ) +            .map(|_| ()); + +            let _ = self.subscriptions.insert( +                id, +                Execution { +                    _cancel: cancel, +                    listener: if event_sender.is_closed() { +                        None +                    } else { +                        Some(event_sender) +                    }, +                }, +            ); + +            futures.push(future.boxed()); +        } + +        self.subscriptions.retain(|id, _| alive.contains(&id)); + +        futures +    } + +    /// Broadcasts an event to the subscriptions currently alive. +    /// +    /// A subscription's [`Recipe::stream`] always receives a stream of events +    /// as input. This stream can be used by some subscription to listen to +    /// shell events. +    /// +    /// This method publishes the given event to all the subscription streams +    /// currently open. +    pub fn broadcast(&mut self, event: Event) { +        self.subscriptions +            .values_mut() +            .filter_map(|connection| connection.listener.as_mut()) +            .for_each(|listener| { +                if let Err(error) = listener.try_send(event.clone()) { +                    log::error!( +                        "Error sending event to subscription: {:?}", +                        error +                    ); +                } +            }); +    } +}  | 
