diff options
Diffstat (limited to 'futures')
-rw-r--r-- | futures/Cargo.toml | 17 | ||||
-rw-r--r-- | futures/src/command.rs | 139 | ||||
-rw-r--r-- | futures/src/command/native.rs | 100 | ||||
-rw-r--r-- | futures/src/command/web.rs | 101 | ||||
-rw-r--r-- | futures/src/executor.rs | 12 | ||||
-rw-r--r-- | futures/src/executor/async_std.rs | 1 | ||||
-rw-r--r-- | futures/src/executor/thread_pool.rs | 1 | ||||
-rw-r--r-- | futures/src/executor/tokio.rs | 4 | ||||
-rw-r--r-- | futures/src/executor/tokio_old.rs | 21 | ||||
-rw-r--r-- | futures/src/lib.rs | 39 | ||||
-rw-r--r-- | futures/src/runtime.rs | 22 | ||||
-rw-r--r-- | futures/src/subscription.rs | 133 | ||||
-rw-r--r-- | futures/src/subscription/tracker.rs | 18 | ||||
-rw-r--r-- | futures/src/time.rs | 74 |
14 files changed, 390 insertions, 292 deletions
diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 483e60cb..e8e47c08 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iced_futures" -version = "0.1.0-alpha" +version = "0.2.0" authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"] edition = "2018" description = "Commands, subscriptions, and runtimes for Iced" @@ -19,14 +19,25 @@ log = "0.4" [dependencies.futures] version = "0.3" -[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio_old] +package = "tokio" version = "0.2" optional = true -features = ["rt-core", "rt-threaded"] +features = ["rt-core", "rt-threaded", "time", "stream"] + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] +version = "0.3" +optional = true +features = ["rt-multi-thread", "time", "stream"] [target.'cfg(not(target_arch = "wasm32"))'.dependencies.async-std] version = "1.0" optional = true +features = ["unstable"] [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" + +[package.metadata.docs.rs] +rustdoc-args = ["--cfg", "docsrs"] +all-features = true diff --git a/futures/src/command.rs b/futures/src/command.rs index 26f58fde..b06ab3f8 100644 --- a/futures/src/command.rs +++ b/futures/src/command.rs @@ -1,11 +1,138 @@ -#[cfg(not(target_arch = "wasm32"))] -mod native; +use crate::BoxFuture; +use futures::future::{Future, FutureExt}; + +/// A collection of async operations. +/// +/// You should be able to turn a future easily into a [`Command`], either by +/// using the `From` trait or [`Command::perform`]. +pub struct Command<T> { + futures: Vec<BoxFuture<T>>, +} + +impl<T> Command<T> { + /// Creates an empty [`Command`]. + /// + /// In other words, a [`Command`] that does nothing. + pub fn none() -> Self { + Self { + futures: Vec::new(), + } + } + + /// Creates a [`Command`] that performs the action of the given future. + #[cfg(not(target_arch = "wasm32"))] + pub fn perform<A>( + future: impl Future<Output = T> + 'static + Send, + f: impl Fn(T) -> A + 'static + Send, + ) -> Command<A> { + Command { + futures: vec![Box::pin(future.map(f))], + } + } + + /// Creates a [`Command`] that performs the action of the given future. + #[cfg(target_arch = "wasm32")] + pub fn perform<A>( + future: impl Future<Output = T> + 'static, + f: impl Fn(T) -> A + 'static + Send, + ) -> Command<A> { + Command { + futures: vec![Box::pin(future.map(f))], + } + } + + /// Applies a transformation to the result of a [`Command`]. + #[cfg(not(target_arch = "wasm32"))] + pub fn map<A>( + mut self, + f: impl Fn(T) -> A + 'static + Send + Sync, + ) -> Command<A> + where + T: 'static, + { + let f = std::sync::Arc::new(f); + + Command { + futures: self + .futures + .drain(..) + .map(|future| { + let f = f.clone(); + + Box::pin(future.map(move |result| f(result))) + as BoxFuture<A> + }) + .collect(), + } + } + + /// Applies a transformation to the result of a [`Command`]. + #[cfg(target_arch = "wasm32")] + pub fn map<A>(mut self, f: impl Fn(T) -> A + 'static) -> Command<A> + where + T: 'static, + { + let f = std::rc::Rc::new(f); + + Command { + futures: self + .futures + .drain(..) + .map(|future| { + let f = f.clone(); + + Box::pin(future.map(move |result| f(result))) + as BoxFuture<A> + }) + .collect(), + } + } + + /// Creates a [`Command`] that performs the actions of all the given + /// commands. + /// + /// Once this command is run, all the commands will be executed at once. + pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self { + Self { + futures: commands + .into_iter() + .flat_map(|command| command.futures) + .collect(), + } + } + + /// Converts a [`Command`] into its underlying list of futures. + pub fn futures(self) -> Vec<BoxFuture<T>> { + self.futures + } +} #[cfg(not(target_arch = "wasm32"))] -pub use native::Command; +impl<T, A> From<A> for Command<T> +where + A: Future<Output = T> + 'static + Send, +{ + fn from(future: A) -> Self { + Self { + futures: vec![future.boxed()], + } + } +} #[cfg(target_arch = "wasm32")] -mod web; +impl<T, A> From<A> for Command<T> +where + A: Future<Output = T> + 'static, +{ + fn from(future: A) -> Self { + Self { + futures: vec![future.boxed_local()], + } + } +} -#[cfg(target_arch = "wasm32")] -pub use web::Command; +impl<T> std::fmt::Debug for Command<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Command").finish() + } +} diff --git a/futures/src/command/native.rs b/futures/src/command/native.rs deleted file mode 100644 index 38cb4e06..00000000 --- a/futures/src/command/native.rs +++ /dev/null @@ -1,100 +0,0 @@ -use futures::future::{BoxFuture, Future, FutureExt}; - -/// A collection of async operations. -/// -/// You should be able to turn a future easily into a [`Command`], either by -/// using the `From` trait or [`Command::perform`]. -/// -/// [`Command`]: struct.Command.html -pub struct Command<T> { - futures: Vec<BoxFuture<'static, T>>, -} - -impl<T> Command<T> { - /// Creates an empty [`Command`]. - /// - /// In other words, a [`Command`] that does nothing. - /// - /// [`Command`]: struct.Command.html - pub fn none() -> Self { - Self { - futures: Vec::new(), - } - } - - /// Creates a [`Command`] that performs the action of the given future. - /// - /// [`Command`]: struct.Command.html - pub fn perform<A>( - future: impl Future<Output = T> + 'static + Send, - f: impl Fn(T) -> A + 'static + Send, - ) -> Command<A> { - Command { - futures: vec![future.map(f).boxed()], - } - } - - /// Applies a transformation to the result of a [`Command`]. - /// - /// [`Command`]: struct.Command.html - pub fn map<A>( - mut self, - f: impl Fn(T) -> A + 'static + Send + Sync, - ) -> Command<A> - where - T: 'static, - { - let f = std::sync::Arc::new(f); - - Command { - futures: self - .futures - .drain(..) - .map(|future| { - let f = f.clone(); - - future.map(move |result| f(result)).boxed() - }) - .collect(), - } - } - - /// Creates a [`Command`] that performs the actions of all the given - /// commands. - /// - /// Once this command is run, all the commands will be executed at once. - /// - /// [`Command`]: struct.Command.html - pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self { - Self { - futures: commands - .into_iter() - .flat_map(|command| command.futures) - .collect(), - } - } - - /// Converts a [`Command`] into its underlying list of futures. - /// - /// [`Command`]: struct.Command.html - pub fn futures(self) -> Vec<BoxFuture<'static, T>> { - self.futures - } -} - -impl<T, A> From<A> for Command<T> -where - A: Future<Output = T> + 'static + Send, -{ - fn from(future: A) -> Self { - Self { - futures: vec![future.boxed()], - } - } -} - -impl<T> std::fmt::Debug for Command<T> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Command").finish() - } -} diff --git a/futures/src/command/web.rs b/futures/src/command/web.rs deleted file mode 100644 index 11b46b90..00000000 --- a/futures/src/command/web.rs +++ /dev/null @@ -1,101 +0,0 @@ -use futures::future::{Future, FutureExt}; -use std::pin::Pin; - -/// A collection of async operations. -/// -/// You should be able to turn a future easily into a [`Command`], either by -/// using the `From` trait or [`Command::perform`]. -/// -/// [`Command`]: struct.Command.html -pub struct Command<T> { - futures: Vec<Pin<Box<dyn Future<Output = T> + 'static>>>, -} - -impl<T> Command<T> { - /// Creates an empty [`Command`]. - /// - /// In other words, a [`Command`] that does nothing. - /// - /// [`Command`]: struct.Command.html - pub fn none() -> Self { - Self { - futures: Vec::new(), - } - } - - /// Creates a [`Command`] that performs the action of the given future. - /// - /// [`Command`]: struct.Command.html - pub fn perform<A>( - future: impl Future<Output = T> + 'static, - f: impl Fn(T) -> A + 'static, - ) -> Command<A> { - Command { - futures: vec![future.map(f).boxed_local()], - } - } - - /// Applies a transformation to the result of a [`Command`]. - /// - /// [`Command`]: struct.Command.html - pub fn map<A>( - mut self, - f: impl Fn(T) -> A + 'static + Send + Sync + Unpin, - ) -> Command<A> - where - T: 'static, - { - let f = std::sync::Arc::new(f); - - Command { - futures: self - .futures - .drain(..) - .map(|future| { - let f = f.clone(); - - future.map(move |result| f(result)).boxed_local() - }) - .collect(), - } - } - - /// Creates a [`Command`] that performs the actions of all the given - /// commands. - /// - /// Once this command is run, all the commands will be executed at once. - /// - /// [`Command`]: struct.Command.html - pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self { - Self { - futures: commands - .into_iter() - .flat_map(|command| command.futures) - .collect(), - } - } - - /// Converts a [`Command`] into its underlying list of futures. - /// - /// [`Command`]: struct.Command.html - pub fn futures(self) -> Vec<Pin<Box<dyn Future<Output = T> + 'static>>> { - self.futures - } -} - -impl<T, A> From<A> for Command<T> -where - A: Future<Output = T> + 'static, -{ - fn from(future: A) -> Self { - Self { - futures: vec![future.boxed_local()], - } - } -} - -impl<T> std::fmt::Debug for Command<T> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Command").finish() - } -} diff --git a/futures/src/executor.rs b/futures/src/executor.rs index 5378c0b3..fa87216a 100644 --- a/futures/src/executor.rs +++ b/futures/src/executor.rs @@ -7,6 +7,9 @@ mod thread_pool; #[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))] mod tokio; +#[cfg(all(not(target_arch = "wasm32"), feature = "tokio_old"))] +mod tokio_old; + #[cfg(all(not(target_arch = "wasm32"), feature = "async-std"))] mod async_std; @@ -21,6 +24,9 @@ pub use thread_pool::ThreadPool; #[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))] pub use self::tokio::Tokio; +#[cfg(all(not(target_arch = "wasm32"), feature = "tokio_old"))] +pub use self::tokio_old::TokioOld; + #[cfg(all(not(target_arch = "wasm32"), feature = "async-std"))] pub use self::async_std::AsyncStd; @@ -32,21 +38,15 @@ use futures::Future; /// A type that can run futures. pub trait Executor: Sized { /// Creates a new [`Executor`]. - /// - /// [`Executor`]: trait.Executor.html fn new() -> Result<Self, futures::io::Error> where Self: Sized; /// Spawns a future in the [`Executor`]. - /// - /// [`Executor`]: trait.Executor.html #[cfg(not(target_arch = "wasm32"))] fn spawn(&self, future: impl Future<Output = ()> + Send + 'static); /// Spawns a local future in the [`Executor`]. - /// - /// [`Executor`]: trait.Executor.html #[cfg(target_arch = "wasm32")] fn spawn(&self, future: impl Future<Output = ()> + 'static); diff --git a/futures/src/executor/async_std.rs b/futures/src/executor/async_std.rs index 27949e31..471be369 100644 --- a/futures/src/executor/async_std.rs +++ b/futures/src/executor/async_std.rs @@ -3,6 +3,7 @@ use crate::Executor; use futures::Future; /// An `async-std` runtime. +#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))] #[derive(Debug)] pub struct AsyncStd; diff --git a/futures/src/executor/thread_pool.rs b/futures/src/executor/thread_pool.rs index 1ec5bf69..a6c6168e 100644 --- a/futures/src/executor/thread_pool.rs +++ b/futures/src/executor/thread_pool.rs @@ -3,6 +3,7 @@ use crate::Executor; use futures::Future; /// A thread pool runtime for futures. +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] pub type ThreadPool = futures::executor::ThreadPool; impl Executor for futures::executor::ThreadPool { diff --git a/futures/src/executor/tokio.rs b/futures/src/executor/tokio.rs index 20802ceb..c6a21cec 100644 --- a/futures/src/executor/tokio.rs +++ b/futures/src/executor/tokio.rs @@ -3,6 +3,7 @@ use crate::Executor; use futures::Future; /// A `tokio` runtime. +#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub type Tokio = tokio::runtime::Runtime; impl Executor for Tokio { @@ -15,6 +16,7 @@ impl Executor for Tokio { } fn enter<R>(&self, f: impl FnOnce() -> R) -> R { - tokio::runtime::Runtime::enter(self, f) + let _guard = tokio::runtime::Runtime::enter(self); + f() } } diff --git a/futures/src/executor/tokio_old.rs b/futures/src/executor/tokio_old.rs new file mode 100644 index 00000000..d64729fa --- /dev/null +++ b/futures/src/executor/tokio_old.rs @@ -0,0 +1,21 @@ +use crate::Executor; + +use futures::Future; + +/// An old `tokio` runtime. +#[cfg_attr(docsrs, doc(cfg(feature = "tokio_old")))] +pub type TokioOld = tokio_old::runtime::Runtime; + +impl Executor for TokioOld { + fn new() -> Result<Self, futures::io::Error> { + tokio_old::runtime::Runtime::new() + } + + fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) { + let _ = tokio_old::runtime::Runtime::spawn(self, future); + } + + fn enter<R>(&self, f: impl FnOnce() -> R) -> R { + tokio_old::runtime::Runtime::enter(self, f) + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index c25c0853..c7c6fd3a 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -1,9 +1,13 @@ //! Asynchronous tasks for GUI programming, inspired by Elm. +//! +//!  #![deny(missing_docs)] #![deny(missing_debug_implementations)] #![deny(unused_results)] #![forbid(unsafe_code)] #![forbid(rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg))] + pub use futures; mod command; @@ -12,7 +16,42 @@ mod runtime; pub mod executor; pub mod subscription; +#[cfg(all( + any(feature = "tokio", feature = "tokio_old", feature = "async-std"), + not(target_arch = "wasm32") +))] +#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async-std"))))] +pub mod time; + pub use command::Command; pub use executor::Executor; pub use runtime::Runtime; pub use subscription::Subscription; + +/// A boxed static future. +/// +/// - On native platforms, it needs a `Send` requirement. +/// - On the Web platform, it does not need a `Send` requirement. +#[cfg(not(target_arch = "wasm32"))] +pub type BoxFuture<T> = futures::future::BoxFuture<'static, T>; + +/// A boxed static future. +/// +/// - On native platforms, it needs a `Send` requirement. +/// - On the Web platform, it does not need a `Send` requirement. +#[cfg(target_arch = "wasm32")] +pub type BoxFuture<T> = futures::future::LocalBoxFuture<'static, T>; + +/// A boxed static stream. +/// +/// - On native platforms, it needs a `Send` requirement. +/// - On the Web platform, it does not need a `Send` requirement. +#[cfg(not(target_arch = "wasm32"))] +pub type BoxStream<T> = futures::stream::BoxStream<'static, T>; + +/// A boxed static stream. +/// +/// - On native platforms, it needs a `Send` requirement. +/// - On the Web platform, it does not need a `Send` requirement. +#[cfg(target_arch = "wasm32")] +pub type BoxStream<T> = futures::stream::LocalBoxStream<'static, T>; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index d204670b..e56a4eb0 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -8,11 +8,6 @@ use std::marker::PhantomData; /// /// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any /// [`Command`] or [`Subscription`] and get notified of the results! -/// -/// [`Runtime`]: struct.Runtime.html -/// [`Executor`]: executor/trait.Executor.html -/// [`Command`]: struct.Command.html -/// [`Subscription`]: subscription/struct.Subscription.html #[derive(Debug)] pub struct Runtime<Hasher, Event, Executor, Sender, Message> { executor: Executor, @@ -36,8 +31,6 @@ where /// You need to provide: /// - an [`Executor`] to spawn futures /// - a `Sender` implementing `Sink` to receive the results - /// - /// [`Runtime`]: struct.Runtime.html pub fn new(executor: Executor, sender: Sender) -> Self { Self { executor, @@ -50,10 +43,6 @@ where /// Runs the given closure inside the [`Executor`] of the [`Runtime`]. /// /// See [`Executor::enter`] to learn more. - /// - /// [`Executor`]: executor/trait.Executor.html - /// [`Runtime`]: struct.Runtime.html - /// [`Executor::enter`]: executor/trait.Executor.html#method.enter pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R { self.executor.enter(f) } @@ -62,9 +51,6 @@ where /// /// The resulting `Message` will be forwarded to the `Sender` of the /// [`Runtime`]. - /// - /// [`Command`]: struct.Command.html - /// [`Runtime`]: struct.Runtime.html pub fn spawn(&mut self, command: Command<Message>) { use futures::{FutureExt, SinkExt}; @@ -88,9 +74,7 @@ where /// It will spawn new streams or close old ones as necessary! See /// [`Tracker::update`] to learn more about this! /// - /// [`Subscription`]: subscription/struct.Subscription.html - /// [`Runtime`]: struct.Runtime.html - /// [`Tracker::update`]: subscription/struct.Tracker.html#method.update + /// [`Tracker::update`]: subscription::Tracker::update pub fn track( &mut self, subscription: Subscription<Hasher, Event, Message>, @@ -115,9 +99,7 @@ where /// /// See [`Tracker::broadcast`] to learn more. /// - /// [`Runtime`]: struct.Runtime.html - /// [`Tracker::broadcast`]: - /// subscription/struct.Tracker.html#method.broadcast + /// [`Tracker::broadcast`]: subscription::Tracker::broadcast pub fn broadcast(&mut self, event: Event) { self.subscriptions.broadcast(event); } diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index b68444cd..27d2d295 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -3,7 +3,7 @@ mod tracker; pub use tracker::Tracker; -use futures::stream::BoxStream; +use crate::BoxStream; /// A request to listen to external events. /// @@ -19,8 +19,7 @@ use futures::stream::BoxStream; /// This type is normally aliased by runtimes with a specific `Event` and/or /// `Hasher`. /// -/// [`Command`]: ../struct.Command.html -/// [`Subscription`]: struct.Subscription.html +/// [`Command`]: crate::Command pub struct Subscription<Hasher, Event, Output> { recipes: Vec<Box<dyn Recipe<Hasher, Event, Output = Output>>>, } @@ -30,8 +29,6 @@ where H: std::hash::Hasher, { /// Returns an empty [`Subscription`] that will not produce any output. - /// - /// [`Subscription`]: struct.Subscription.html pub fn none() -> Self { Self { recipes: Vec::new(), @@ -39,9 +36,6 @@ where } /// Creates a [`Subscription`] from a [`Recipe`] describing it. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html pub fn from_recipe( recipe: impl Recipe<H, E, Output = O> + 'static, ) -> Self { @@ -52,8 +46,6 @@ where /// Batches all the provided subscriptions and returns the resulting /// [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html pub fn batch( subscriptions: impl IntoIterator<Item = Subscription<H, E, O>>, ) -> Self { @@ -66,33 +58,46 @@ where } /// Returns the different recipes of the [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html pub fn recipes(self) -> Vec<Box<dyn Recipe<H, E, Output = O>>> { self.recipes } - /// Transforms the [`Subscription`] output with the given function. + /// Adds a value to the [`Subscription`] context. /// - /// [`Subscription`]: struct.Subscription.html - pub fn map<A>( - mut self, - f: impl Fn(O) -> A + Send + Sync + 'static, - ) -> Subscription<H, E, A> + /// The value will be part of the identity of a [`Subscription`]. + pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)> where H: 'static, E: 'static, O: 'static, - A: 'static, + T: std::hash::Hash + Clone + Send + Sync + 'static, { - let function = std::sync::Arc::new(f); + Subscription { + recipes: self + .recipes + .drain(..) + .map(|recipe| { + Box::new(With::new(recipe, value.clone())) + as Box<dyn Recipe<H, E, Output = (T, O)>> + }) + .collect(), + } + } + /// Transforms the [`Subscription`] output with the given function. + pub fn map<A>(mut self, f: fn(O) -> A) -> Subscription<H, E, A> + where + H: 'static, + E: 'static, + O: 'static, + A: 'static, + { Subscription { recipes: self .recipes .drain(..) .map(|recipe| { - Box::new(Map::new(recipe, function.clone())) + Box::new(Map::new(recipe, f)) as Box<dyn Recipe<H, E, Output = A>> }) .collect(), @@ -112,22 +117,25 @@ impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> { /// by runtimes to run and identify subscriptions. You can use it to create your /// own! /// -/// [`Subscription`]: struct.Subscription.html -/// [`Recipe`]: trait.Recipe.html +/// # Examples +/// The repository has a couple of [examples] that use a custom [`Recipe`]: +/// +/// - [`download_progress`], a basic application that asynchronously downloads +/// a dummy file of 100 MB and tracks the download progress. +/// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how +/// to listen to time. +/// +/// [examples]: https://github.com/hecrj/iced/tree/0.2/examples +/// [`download_progress`]: https://github.com/hecrj/iced/tree/0.2/examples/download_progress +/// [`stopwatch`]: https://github.com/hecrj/iced/tree/0.2/examples/stopwatch pub trait Recipe<Hasher: std::hash::Hasher, Event> { /// The events that will be produced by a [`Subscription`] with this /// [`Recipe`]. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html type Output; /// Hashes the [`Recipe`]. /// /// This is used by runtimes to uniquely identify a [`Subscription`]. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html fn hash(&self, state: &mut Hasher); /// Executes the [`Recipe`] and produces the stream of events of its @@ -135,24 +143,21 @@ pub trait Recipe<Hasher: std::hash::Hasher, Event> { /// /// It receives some stream of generic events, which is normally defined by /// shells. - /// - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html fn stream( self: Box<Self>, - input: BoxStream<'static, Event>, - ) -> BoxStream<'static, Self::Output>; + input: BoxStream<Event>, + ) -> BoxStream<Self::Output>; } struct Map<Hasher, Event, A, B> { recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, - mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>, + mapper: fn(A) -> B, } impl<H, E, A, B> Map<H, E, A, B> { fn new( recipe: Box<dyn Recipe<H, E, Output = A>>, - mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>, + mapper: fn(A) -> B, ) -> Self { Map { recipe, mapper } } @@ -169,21 +174,59 @@ where fn hash(&self, state: &mut H) { use std::hash::Hash; - std::any::TypeId::of::<B>().hash(state); self.recipe.hash(state); + self.mapper.hash(state); } - fn stream( - self: Box<Self>, - input: BoxStream<'static, E>, - ) -> futures::stream::BoxStream<'static, Self::Output> { + fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> { use futures::StreamExt; let mapper = self.mapper; - self.recipe - .stream(input) - .map(move |element| mapper(element)) - .boxed() + Box::pin( + self.recipe + .stream(input) + .map(move |element| mapper(element)), + ) + } +} + +struct With<Hasher, Event, A, B> { + recipe: Box<dyn Recipe<Hasher, Event, Output = A>>, + value: B, +} + +impl<H, E, A, B> With<H, E, A, B> { + fn new(recipe: Box<dyn Recipe<H, E, Output = A>>, value: B) -> Self { + With { recipe, value } + } +} + +impl<H, E, A, B> Recipe<H, E> for With<H, E, A, B> +where + A: 'static, + B: 'static + std::hash::Hash + Clone + Send + Sync, + H: std::hash::Hasher, +{ + type Output = (B, A); + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::<B>().hash(state); + self.value.hash(state); + self.recipe.hash(state); + } + + fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> { + use futures::StreamExt; + + let value = self.value; + + Box::pin( + self.recipe + .stream(input) + .map(move |element| (value.clone(), element)), + ) } } diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index cfa36170..43222b5b 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -1,6 +1,6 @@ -use crate::Subscription; +use crate::{BoxFuture, Subscription}; -use futures::{channel::mpsc, future::BoxFuture, sink::Sink}; +use futures::{channel::mpsc, sink::Sink}; use std::{collections::HashMap, marker::PhantomData}; /// A registry of subscription streams. @@ -26,8 +26,6 @@ where Event: 'static + Send + Clone, { /// Creates a new empty [`Tracker`]. - /// - /// [`Tracker`]: struct.Tracker.html pub fn new() -> Self { Self { subscriptions: HashMap::new(), @@ -52,14 +50,12 @@ where /// It returns a list of futures that need to be spawned to materialize /// the [`Tracker`] changes. /// - /// [`Tracker`]: struct.Tracker.html - /// [`Subscription`]: struct.Subscription.html - /// [`Recipe`]: trait.Recipe.html + /// [`Recipe`]: crate::subscription::Recipe pub fn update<Message, Receiver>( &mut self, subscription: Subscription<Hasher, Event, Message>, receiver: Receiver, - ) -> Vec<BoxFuture<'static, ()>> + ) -> Vec<BoxFuture<()>> where Message: 'static + Send, Receiver: 'static @@ -70,7 +66,7 @@ where { use futures::{future::FutureExt, stream::StreamExt}; - let mut futures = Vec::new(); + let mut futures: Vec<BoxFuture<()>> = Vec::new(); let recipes = subscription.recipes(); let mut alive = std::collections::HashSet::new(); @@ -115,7 +111,7 @@ where }, ); - futures.push(future.boxed()); + futures.push(Box::pin(future)); } self.subscriptions.retain(|id, _| alive.contains(&id)); @@ -131,6 +127,8 @@ where /// /// This method publishes the given event to all the subscription streams /// currently open. + /// + /// [`Recipe::stream`]: crate::subscription::Recipe::stream pub fn broadcast(&mut self, event: Event) { self.subscriptions .values_mut() diff --git a/futures/src/time.rs b/futures/src/time.rs new file mode 100644 index 00000000..5e9ea436 --- /dev/null +++ b/futures/src/time.rs @@ -0,0 +1,74 @@ +//! Listen and react to time. +use crate::subscription::{self, Subscription}; + +/// Returns a [`Subscription`] that produces messages at a set interval. +/// +/// The first message is produced after a `duration`, and then continues to +/// produce more messages every `duration` after that. +pub fn every<H: std::hash::Hasher, E>( + duration: std::time::Duration, +) -> Subscription<H, E, std::time::Instant> { + Subscription::from_recipe(Every(duration)) +} + +struct Every(std::time::Duration); + +#[cfg(feature = "async-std")] +impl<H, E> subscription::Recipe<H, E> for Every +where + H: std::hash::Hasher, +{ + type Output = std::time::Instant; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::<Self>().hash(state); + self.0.hash(state); + } + + fn stream( + self: Box<Self>, + _input: futures::stream::BoxStream<'static, E>, + ) -> futures::stream::BoxStream<'static, Self::Output> { + use futures::stream::StreamExt; + + async_std::stream::interval(self.0) + .map(|_| std::time::Instant::now()) + .boxed() + } +} + +#[cfg(all( + any(feature = "tokio", feature = "tokio_old"), + not(feature = "async-std") +))] +impl<H, E> subscription::Recipe<H, E> for Every +where + H: std::hash::Hasher, +{ + type Output = std::time::Instant; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::<Self>().hash(state); + self.0.hash(state); + } + + fn stream( + self: Box<Self>, + _input: futures::stream::BoxStream<'static, E>, + ) -> futures::stream::BoxStream<'static, Self::Output> { + use futures::stream::StreamExt; + + #[cfg(feature = "tokio_old")] + use tokio_old as tokio; + + let start = tokio::time::Instant::now() + self.0; + + tokio::time::interval_at(start, self.0) + .map(|_| std::time::Instant::now()) + .boxed() + } +} |