summaryrefslogtreecommitdiffstats
path: root/futures
diff options
context:
space:
mode:
Diffstat (limited to 'futures')
-rw-r--r--futures/Cargo.toml17
-rw-r--r--futures/src/command.rs139
-rw-r--r--futures/src/command/native.rs100
-rw-r--r--futures/src/command/web.rs101
-rw-r--r--futures/src/executor.rs12
-rw-r--r--futures/src/executor/async_std.rs1
-rw-r--r--futures/src/executor/thread_pool.rs1
-rw-r--r--futures/src/executor/tokio.rs4
-rw-r--r--futures/src/executor/tokio_old.rs21
-rw-r--r--futures/src/lib.rs39
-rw-r--r--futures/src/runtime.rs22
-rw-r--r--futures/src/subscription.rs133
-rw-r--r--futures/src/subscription/tracker.rs18
-rw-r--r--futures/src/time.rs74
14 files changed, 390 insertions, 292 deletions
diff --git a/futures/Cargo.toml b/futures/Cargo.toml
index 483e60cb..e8e47c08 100644
--- a/futures/Cargo.toml
+++ b/futures/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "iced_futures"
-version = "0.1.0-alpha"
+version = "0.2.0"
authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"]
edition = "2018"
description = "Commands, subscriptions, and runtimes for Iced"
@@ -19,14 +19,25 @@ log = "0.4"
[dependencies.futures]
version = "0.3"
-[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
+[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio_old]
+package = "tokio"
version = "0.2"
optional = true
-features = ["rt-core", "rt-threaded"]
+features = ["rt-core", "rt-threaded", "time", "stream"]
+
+[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
+version = "0.3"
+optional = true
+features = ["rt-multi-thread", "time", "stream"]
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.async-std]
version = "1.0"
optional = true
+features = ["unstable"]
[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
+
+[package.metadata.docs.rs]
+rustdoc-args = ["--cfg", "docsrs"]
+all-features = true
diff --git a/futures/src/command.rs b/futures/src/command.rs
index 26f58fde..b06ab3f8 100644
--- a/futures/src/command.rs
+++ b/futures/src/command.rs
@@ -1,11 +1,138 @@
-#[cfg(not(target_arch = "wasm32"))]
-mod native;
+use crate::BoxFuture;
+use futures::future::{Future, FutureExt};
+
+/// A collection of async operations.
+///
+/// You should be able to turn a future easily into a [`Command`], either by
+/// using the `From` trait or [`Command::perform`].
+pub struct Command<T> {
+ futures: Vec<BoxFuture<T>>,
+}
+
+impl<T> Command<T> {
+ /// Creates an empty [`Command`].
+ ///
+ /// In other words, a [`Command`] that does nothing.
+ pub fn none() -> Self {
+ Self {
+ futures: Vec::new(),
+ }
+ }
+
+ /// Creates a [`Command`] that performs the action of the given future.
+ #[cfg(not(target_arch = "wasm32"))]
+ pub fn perform<A>(
+ future: impl Future<Output = T> + 'static + Send,
+ f: impl Fn(T) -> A + 'static + Send,
+ ) -> Command<A> {
+ Command {
+ futures: vec![Box::pin(future.map(f))],
+ }
+ }
+
+ /// Creates a [`Command`] that performs the action of the given future.
+ #[cfg(target_arch = "wasm32")]
+ pub fn perform<A>(
+ future: impl Future<Output = T> + 'static,
+ f: impl Fn(T) -> A + 'static + Send,
+ ) -> Command<A> {
+ Command {
+ futures: vec![Box::pin(future.map(f))],
+ }
+ }
+
+ /// Applies a transformation to the result of a [`Command`].
+ #[cfg(not(target_arch = "wasm32"))]
+ pub fn map<A>(
+ mut self,
+ f: impl Fn(T) -> A + 'static + Send + Sync,
+ ) -> Command<A>
+ where
+ T: 'static,
+ {
+ let f = std::sync::Arc::new(f);
+
+ Command {
+ futures: self
+ .futures
+ .drain(..)
+ .map(|future| {
+ let f = f.clone();
+
+ Box::pin(future.map(move |result| f(result)))
+ as BoxFuture<A>
+ })
+ .collect(),
+ }
+ }
+
+ /// Applies a transformation to the result of a [`Command`].
+ #[cfg(target_arch = "wasm32")]
+ pub fn map<A>(mut self, f: impl Fn(T) -> A + 'static) -> Command<A>
+ where
+ T: 'static,
+ {
+ let f = std::rc::Rc::new(f);
+
+ Command {
+ futures: self
+ .futures
+ .drain(..)
+ .map(|future| {
+ let f = f.clone();
+
+ Box::pin(future.map(move |result| f(result)))
+ as BoxFuture<A>
+ })
+ .collect(),
+ }
+ }
+
+ /// Creates a [`Command`] that performs the actions of all the given
+ /// commands.
+ ///
+ /// Once this command is run, all the commands will be executed at once.
+ pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self {
+ Self {
+ futures: commands
+ .into_iter()
+ .flat_map(|command| command.futures)
+ .collect(),
+ }
+ }
+
+ /// Converts a [`Command`] into its underlying list of futures.
+ pub fn futures(self) -> Vec<BoxFuture<T>> {
+ self.futures
+ }
+}
#[cfg(not(target_arch = "wasm32"))]
-pub use native::Command;
+impl<T, A> From<A> for Command<T>
+where
+ A: Future<Output = T> + 'static + Send,
+{
+ fn from(future: A) -> Self {
+ Self {
+ futures: vec![future.boxed()],
+ }
+ }
+}
#[cfg(target_arch = "wasm32")]
-mod web;
+impl<T, A> From<A> for Command<T>
+where
+ A: Future<Output = T> + 'static,
+{
+ fn from(future: A) -> Self {
+ Self {
+ futures: vec![future.boxed_local()],
+ }
+ }
+}
-#[cfg(target_arch = "wasm32")]
-pub use web::Command;
+impl<T> std::fmt::Debug for Command<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Command").finish()
+ }
+}
diff --git a/futures/src/command/native.rs b/futures/src/command/native.rs
deleted file mode 100644
index 38cb4e06..00000000
--- a/futures/src/command/native.rs
+++ /dev/null
@@ -1,100 +0,0 @@
-use futures::future::{BoxFuture, Future, FutureExt};
-
-/// A collection of async operations.
-///
-/// You should be able to turn a future easily into a [`Command`], either by
-/// using the `From` trait or [`Command::perform`].
-///
-/// [`Command`]: struct.Command.html
-pub struct Command<T> {
- futures: Vec<BoxFuture<'static, T>>,
-}
-
-impl<T> Command<T> {
- /// Creates an empty [`Command`].
- ///
- /// In other words, a [`Command`] that does nothing.
- ///
- /// [`Command`]: struct.Command.html
- pub fn none() -> Self {
- Self {
- futures: Vec::new(),
- }
- }
-
- /// Creates a [`Command`] that performs the action of the given future.
- ///
- /// [`Command`]: struct.Command.html
- pub fn perform<A>(
- future: impl Future<Output = T> + 'static + Send,
- f: impl Fn(T) -> A + 'static + Send,
- ) -> Command<A> {
- Command {
- futures: vec![future.map(f).boxed()],
- }
- }
-
- /// Applies a transformation to the result of a [`Command`].
- ///
- /// [`Command`]: struct.Command.html
- pub fn map<A>(
- mut self,
- f: impl Fn(T) -> A + 'static + Send + Sync,
- ) -> Command<A>
- where
- T: 'static,
- {
- let f = std::sync::Arc::new(f);
-
- Command {
- futures: self
- .futures
- .drain(..)
- .map(|future| {
- let f = f.clone();
-
- future.map(move |result| f(result)).boxed()
- })
- .collect(),
- }
- }
-
- /// Creates a [`Command`] that performs the actions of all the given
- /// commands.
- ///
- /// Once this command is run, all the commands will be executed at once.
- ///
- /// [`Command`]: struct.Command.html
- pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self {
- Self {
- futures: commands
- .into_iter()
- .flat_map(|command| command.futures)
- .collect(),
- }
- }
-
- /// Converts a [`Command`] into its underlying list of futures.
- ///
- /// [`Command`]: struct.Command.html
- pub fn futures(self) -> Vec<BoxFuture<'static, T>> {
- self.futures
- }
-}
-
-impl<T, A> From<A> for Command<T>
-where
- A: Future<Output = T> + 'static + Send,
-{
- fn from(future: A) -> Self {
- Self {
- futures: vec![future.boxed()],
- }
- }
-}
-
-impl<T> std::fmt::Debug for Command<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("Command").finish()
- }
-}
diff --git a/futures/src/command/web.rs b/futures/src/command/web.rs
deleted file mode 100644
index 11b46b90..00000000
--- a/futures/src/command/web.rs
+++ /dev/null
@@ -1,101 +0,0 @@
-use futures::future::{Future, FutureExt};
-use std::pin::Pin;
-
-/// A collection of async operations.
-///
-/// You should be able to turn a future easily into a [`Command`], either by
-/// using the `From` trait or [`Command::perform`].
-///
-/// [`Command`]: struct.Command.html
-pub struct Command<T> {
- futures: Vec<Pin<Box<dyn Future<Output = T> + 'static>>>,
-}
-
-impl<T> Command<T> {
- /// Creates an empty [`Command`].
- ///
- /// In other words, a [`Command`] that does nothing.
- ///
- /// [`Command`]: struct.Command.html
- pub fn none() -> Self {
- Self {
- futures: Vec::new(),
- }
- }
-
- /// Creates a [`Command`] that performs the action of the given future.
- ///
- /// [`Command`]: struct.Command.html
- pub fn perform<A>(
- future: impl Future<Output = T> + 'static,
- f: impl Fn(T) -> A + 'static,
- ) -> Command<A> {
- Command {
- futures: vec![future.map(f).boxed_local()],
- }
- }
-
- /// Applies a transformation to the result of a [`Command`].
- ///
- /// [`Command`]: struct.Command.html
- pub fn map<A>(
- mut self,
- f: impl Fn(T) -> A + 'static + Send + Sync + Unpin,
- ) -> Command<A>
- where
- T: 'static,
- {
- let f = std::sync::Arc::new(f);
-
- Command {
- futures: self
- .futures
- .drain(..)
- .map(|future| {
- let f = f.clone();
-
- future.map(move |result| f(result)).boxed_local()
- })
- .collect(),
- }
- }
-
- /// Creates a [`Command`] that performs the actions of all the given
- /// commands.
- ///
- /// Once this command is run, all the commands will be executed at once.
- ///
- /// [`Command`]: struct.Command.html
- pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self {
- Self {
- futures: commands
- .into_iter()
- .flat_map(|command| command.futures)
- .collect(),
- }
- }
-
- /// Converts a [`Command`] into its underlying list of futures.
- ///
- /// [`Command`]: struct.Command.html
- pub fn futures(self) -> Vec<Pin<Box<dyn Future<Output = T> + 'static>>> {
- self.futures
- }
-}
-
-impl<T, A> From<A> for Command<T>
-where
- A: Future<Output = T> + 'static,
-{
- fn from(future: A) -> Self {
- Self {
- futures: vec![future.boxed_local()],
- }
- }
-}
-
-impl<T> std::fmt::Debug for Command<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("Command").finish()
- }
-}
diff --git a/futures/src/executor.rs b/futures/src/executor.rs
index 5378c0b3..fa87216a 100644
--- a/futures/src/executor.rs
+++ b/futures/src/executor.rs
@@ -7,6 +7,9 @@ mod thread_pool;
#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
mod tokio;
+#[cfg(all(not(target_arch = "wasm32"), feature = "tokio_old"))]
+mod tokio_old;
+
#[cfg(all(not(target_arch = "wasm32"), feature = "async-std"))]
mod async_std;
@@ -21,6 +24,9 @@ pub use thread_pool::ThreadPool;
#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
pub use self::tokio::Tokio;
+#[cfg(all(not(target_arch = "wasm32"), feature = "tokio_old"))]
+pub use self::tokio_old::TokioOld;
+
#[cfg(all(not(target_arch = "wasm32"), feature = "async-std"))]
pub use self::async_std::AsyncStd;
@@ -32,21 +38,15 @@ use futures::Future;
/// A type that can run futures.
pub trait Executor: Sized {
/// Creates a new [`Executor`].
- ///
- /// [`Executor`]: trait.Executor.html
fn new() -> Result<Self, futures::io::Error>
where
Self: Sized;
/// Spawns a future in the [`Executor`].
- ///
- /// [`Executor`]: trait.Executor.html
#[cfg(not(target_arch = "wasm32"))]
fn spawn(&self, future: impl Future<Output = ()> + Send + 'static);
/// Spawns a local future in the [`Executor`].
- ///
- /// [`Executor`]: trait.Executor.html
#[cfg(target_arch = "wasm32")]
fn spawn(&self, future: impl Future<Output = ()> + 'static);
diff --git a/futures/src/executor/async_std.rs b/futures/src/executor/async_std.rs
index 27949e31..471be369 100644
--- a/futures/src/executor/async_std.rs
+++ b/futures/src/executor/async_std.rs
@@ -3,6 +3,7 @@ use crate::Executor;
use futures::Future;
/// An `async-std` runtime.
+#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))]
#[derive(Debug)]
pub struct AsyncStd;
diff --git a/futures/src/executor/thread_pool.rs b/futures/src/executor/thread_pool.rs
index 1ec5bf69..a6c6168e 100644
--- a/futures/src/executor/thread_pool.rs
+++ b/futures/src/executor/thread_pool.rs
@@ -3,6 +3,7 @@ use crate::Executor;
use futures::Future;
/// A thread pool runtime for futures.
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
pub type ThreadPool = futures::executor::ThreadPool;
impl Executor for futures::executor::ThreadPool {
diff --git a/futures/src/executor/tokio.rs b/futures/src/executor/tokio.rs
index 20802ceb..c6a21cec 100644
--- a/futures/src/executor/tokio.rs
+++ b/futures/src/executor/tokio.rs
@@ -3,6 +3,7 @@ use crate::Executor;
use futures::Future;
/// A `tokio` runtime.
+#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub type Tokio = tokio::runtime::Runtime;
impl Executor for Tokio {
@@ -15,6 +16,7 @@ impl Executor for Tokio {
}
fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
- tokio::runtime::Runtime::enter(self, f)
+ let _guard = tokio::runtime::Runtime::enter(self);
+ f()
}
}
diff --git a/futures/src/executor/tokio_old.rs b/futures/src/executor/tokio_old.rs
new file mode 100644
index 00000000..d64729fa
--- /dev/null
+++ b/futures/src/executor/tokio_old.rs
@@ -0,0 +1,21 @@
+use crate::Executor;
+
+use futures::Future;
+
+/// An old `tokio` runtime.
+#[cfg_attr(docsrs, doc(cfg(feature = "tokio_old")))]
+pub type TokioOld = tokio_old::runtime::Runtime;
+
+impl Executor for TokioOld {
+ fn new() -> Result<Self, futures::io::Error> {
+ tokio_old::runtime::Runtime::new()
+ }
+
+ fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
+ let _ = tokio_old::runtime::Runtime::spawn(self, future);
+ }
+
+ fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
+ tokio_old::runtime::Runtime::enter(self, f)
+ }
+}
diff --git a/futures/src/lib.rs b/futures/src/lib.rs
index c25c0853..c7c6fd3a 100644
--- a/futures/src/lib.rs
+++ b/futures/src/lib.rs
@@ -1,9 +1,13 @@
//! Asynchronous tasks for GUI programming, inspired by Elm.
+//!
+//! ![The foundations of the Iced ecosystem](https://github.com/hecrj/iced/blob/0525d76ff94e828b7b21634fa94a747022001c83/docs/graphs/foundations.png?raw=true)
#![deny(missing_docs)]
#![deny(missing_debug_implementations)]
#![deny(unused_results)]
#![forbid(unsafe_code)]
#![forbid(rust_2018_idioms)]
+#![cfg_attr(docsrs, feature(doc_cfg))]
+
pub use futures;
mod command;
@@ -12,7 +16,42 @@ mod runtime;
pub mod executor;
pub mod subscription;
+#[cfg(all(
+ any(feature = "tokio", feature = "tokio_old", feature = "async-std"),
+ not(target_arch = "wasm32")
+))]
+#[cfg_attr(docsrs, doc(cfg(any(feature = "tokio", feature = "async-std"))))]
+pub mod time;
+
pub use command::Command;
pub use executor::Executor;
pub use runtime::Runtime;
pub use subscription::Subscription;
+
+/// A boxed static future.
+///
+/// - On native platforms, it needs a `Send` requirement.
+/// - On the Web platform, it does not need a `Send` requirement.
+#[cfg(not(target_arch = "wasm32"))]
+pub type BoxFuture<T> = futures::future::BoxFuture<'static, T>;
+
+/// A boxed static future.
+///
+/// - On native platforms, it needs a `Send` requirement.
+/// - On the Web platform, it does not need a `Send` requirement.
+#[cfg(target_arch = "wasm32")]
+pub type BoxFuture<T> = futures::future::LocalBoxFuture<'static, T>;
+
+/// A boxed static stream.
+///
+/// - On native platforms, it needs a `Send` requirement.
+/// - On the Web platform, it does not need a `Send` requirement.
+#[cfg(not(target_arch = "wasm32"))]
+pub type BoxStream<T> = futures::stream::BoxStream<'static, T>;
+
+/// A boxed static stream.
+///
+/// - On native platforms, it needs a `Send` requirement.
+/// - On the Web platform, it does not need a `Send` requirement.
+#[cfg(target_arch = "wasm32")]
+pub type BoxStream<T> = futures::stream::LocalBoxStream<'static, T>;
diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs
index d204670b..e56a4eb0 100644
--- a/futures/src/runtime.rs
+++ b/futures/src/runtime.rs
@@ -8,11 +8,6 @@ use std::marker::PhantomData;
///
/// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any
/// [`Command`] or [`Subscription`] and get notified of the results!
-///
-/// [`Runtime`]: struct.Runtime.html
-/// [`Executor`]: executor/trait.Executor.html
-/// [`Command`]: struct.Command.html
-/// [`Subscription`]: subscription/struct.Subscription.html
#[derive(Debug)]
pub struct Runtime<Hasher, Event, Executor, Sender, Message> {
executor: Executor,
@@ -36,8 +31,6 @@ where
/// You need to provide:
/// - an [`Executor`] to spawn futures
/// - a `Sender` implementing `Sink` to receive the results
- ///
- /// [`Runtime`]: struct.Runtime.html
pub fn new(executor: Executor, sender: Sender) -> Self {
Self {
executor,
@@ -50,10 +43,6 @@ where
/// Runs the given closure inside the [`Executor`] of the [`Runtime`].
///
/// See [`Executor::enter`] to learn more.
- ///
- /// [`Executor`]: executor/trait.Executor.html
- /// [`Runtime`]: struct.Runtime.html
- /// [`Executor::enter`]: executor/trait.Executor.html#method.enter
pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
self.executor.enter(f)
}
@@ -62,9 +51,6 @@ where
///
/// The resulting `Message` will be forwarded to the `Sender` of the
/// [`Runtime`].
- ///
- /// [`Command`]: struct.Command.html
- /// [`Runtime`]: struct.Runtime.html
pub fn spawn(&mut self, command: Command<Message>) {
use futures::{FutureExt, SinkExt};
@@ -88,9 +74,7 @@ where
/// It will spawn new streams or close old ones as necessary! See
/// [`Tracker::update`] to learn more about this!
///
- /// [`Subscription`]: subscription/struct.Subscription.html
- /// [`Runtime`]: struct.Runtime.html
- /// [`Tracker::update`]: subscription/struct.Tracker.html#method.update
+ /// [`Tracker::update`]: subscription::Tracker::update
pub fn track(
&mut self,
subscription: Subscription<Hasher, Event, Message>,
@@ -115,9 +99,7 @@ where
///
/// See [`Tracker::broadcast`] to learn more.
///
- /// [`Runtime`]: struct.Runtime.html
- /// [`Tracker::broadcast`]:
- /// subscription/struct.Tracker.html#method.broadcast
+ /// [`Tracker::broadcast`]: subscription::Tracker::broadcast
pub fn broadcast(&mut self, event: Event) {
self.subscriptions.broadcast(event);
}
diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
index b68444cd..27d2d295 100644
--- a/futures/src/subscription.rs
+++ b/futures/src/subscription.rs
@@ -3,7 +3,7 @@ mod tracker;
pub use tracker::Tracker;
-use futures::stream::BoxStream;
+use crate::BoxStream;
/// A request to listen to external events.
///
@@ -19,8 +19,7 @@ use futures::stream::BoxStream;
/// This type is normally aliased by runtimes with a specific `Event` and/or
/// `Hasher`.
///
-/// [`Command`]: ../struct.Command.html
-/// [`Subscription`]: struct.Subscription.html
+/// [`Command`]: crate::Command
pub struct Subscription<Hasher, Event, Output> {
recipes: Vec<Box<dyn Recipe<Hasher, Event, Output = Output>>>,
}
@@ -30,8 +29,6 @@ where
H: std::hash::Hasher,
{
/// Returns an empty [`Subscription`] that will not produce any output.
- ///
- /// [`Subscription`]: struct.Subscription.html
pub fn none() -> Self {
Self {
recipes: Vec::new(),
@@ -39,9 +36,6 @@ where
}
/// Creates a [`Subscription`] from a [`Recipe`] describing it.
- ///
- /// [`Subscription`]: struct.Subscription.html
- /// [`Recipe`]: trait.Recipe.html
pub fn from_recipe(
recipe: impl Recipe<H, E, Output = O> + 'static,
) -> Self {
@@ -52,8 +46,6 @@ where
/// Batches all the provided subscriptions and returns the resulting
/// [`Subscription`].
- ///
- /// [`Subscription`]: struct.Subscription.html
pub fn batch(
subscriptions: impl IntoIterator<Item = Subscription<H, E, O>>,
) -> Self {
@@ -66,33 +58,46 @@ where
}
/// Returns the different recipes of the [`Subscription`].
- ///
- /// [`Subscription`]: struct.Subscription.html
pub fn recipes(self) -> Vec<Box<dyn Recipe<H, E, Output = O>>> {
self.recipes
}
- /// Transforms the [`Subscription`] output with the given function.
+ /// Adds a value to the [`Subscription`] context.
///
- /// [`Subscription`]: struct.Subscription.html
- pub fn map<A>(
- mut self,
- f: impl Fn(O) -> A + Send + Sync + 'static,
- ) -> Subscription<H, E, A>
+ /// The value will be part of the identity of a [`Subscription`].
+ pub fn with<T>(mut self, value: T) -> Subscription<H, E, (T, O)>
where
H: 'static,
E: 'static,
O: 'static,
- A: 'static,
+ T: std::hash::Hash + Clone + Send + Sync + 'static,
{
- let function = std::sync::Arc::new(f);
+ Subscription {
+ recipes: self
+ .recipes
+ .drain(..)
+ .map(|recipe| {
+ Box::new(With::new(recipe, value.clone()))
+ as Box<dyn Recipe<H, E, Output = (T, O)>>
+ })
+ .collect(),
+ }
+ }
+ /// Transforms the [`Subscription`] output with the given function.
+ pub fn map<A>(mut self, f: fn(O) -> A) -> Subscription<H, E, A>
+ where
+ H: 'static,
+ E: 'static,
+ O: 'static,
+ A: 'static,
+ {
Subscription {
recipes: self
.recipes
.drain(..)
.map(|recipe| {
- Box::new(Map::new(recipe, function.clone()))
+ Box::new(Map::new(recipe, f))
as Box<dyn Recipe<H, E, Output = A>>
})
.collect(),
@@ -112,22 +117,25 @@ impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> {
/// by runtimes to run and identify subscriptions. You can use it to create your
/// own!
///
-/// [`Subscription`]: struct.Subscription.html
-/// [`Recipe`]: trait.Recipe.html
+/// # Examples
+/// The repository has a couple of [examples] that use a custom [`Recipe`]:
+///
+/// - [`download_progress`], a basic application that asynchronously downloads
+/// a dummy file of 100 MB and tracks the download progress.
+/// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how
+/// to listen to time.
+///
+/// [examples]: https://github.com/hecrj/iced/tree/0.2/examples
+/// [`download_progress`]: https://github.com/hecrj/iced/tree/0.2/examples/download_progress
+/// [`stopwatch`]: https://github.com/hecrj/iced/tree/0.2/examples/stopwatch
pub trait Recipe<Hasher: std::hash::Hasher, Event> {
/// The events that will be produced by a [`Subscription`] with this
/// [`Recipe`].
- ///
- /// [`Subscription`]: struct.Subscription.html
- /// [`Recipe`]: trait.Recipe.html
type Output;
/// Hashes the [`Recipe`].
///
/// This is used by runtimes to uniquely identify a [`Subscription`].
- ///
- /// [`Subscription`]: struct.Subscription.html
- /// [`Recipe`]: trait.Recipe.html
fn hash(&self, state: &mut Hasher);
/// Executes the [`Recipe`] and produces the stream of events of its
@@ -135,24 +143,21 @@ pub trait Recipe<Hasher: std::hash::Hasher, Event> {
///
/// It receives some stream of generic events, which is normally defined by
/// shells.
- ///
- /// [`Subscription`]: struct.Subscription.html
- /// [`Recipe`]: trait.Recipe.html
fn stream(
self: Box<Self>,
- input: BoxStream<'static, Event>,
- ) -> BoxStream<'static, Self::Output>;
+ input: BoxStream<Event>,
+ ) -> BoxStream<Self::Output>;
}
struct Map<Hasher, Event, A, B> {
recipe: Box<dyn Recipe<Hasher, Event, Output = A>>,
- mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>,
+ mapper: fn(A) -> B,
}
impl<H, E, A, B> Map<H, E, A, B> {
fn new(
recipe: Box<dyn Recipe<H, E, Output = A>>,
- mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>,
+ mapper: fn(A) -> B,
) -> Self {
Map { recipe, mapper }
}
@@ -169,21 +174,59 @@ where
fn hash(&self, state: &mut H) {
use std::hash::Hash;
- std::any::TypeId::of::<B>().hash(state);
self.recipe.hash(state);
+ self.mapper.hash(state);
}
- fn stream(
- self: Box<Self>,
- input: BoxStream<'static, E>,
- ) -> futures::stream::BoxStream<'static, Self::Output> {
+ fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> {
use futures::StreamExt;
let mapper = self.mapper;
- self.recipe
- .stream(input)
- .map(move |element| mapper(element))
- .boxed()
+ Box::pin(
+ self.recipe
+ .stream(input)
+ .map(move |element| mapper(element)),
+ )
+ }
+}
+
+struct With<Hasher, Event, A, B> {
+ recipe: Box<dyn Recipe<Hasher, Event, Output = A>>,
+ value: B,
+}
+
+impl<H, E, A, B> With<H, E, A, B> {
+ fn new(recipe: Box<dyn Recipe<H, E, Output = A>>, value: B) -> Self {
+ With { recipe, value }
+ }
+}
+
+impl<H, E, A, B> Recipe<H, E> for With<H, E, A, B>
+where
+ A: 'static,
+ B: 'static + std::hash::Hash + Clone + Send + Sync,
+ H: std::hash::Hasher,
+{
+ type Output = (B, A);
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+
+ std::any::TypeId::of::<B>().hash(state);
+ self.value.hash(state);
+ self.recipe.hash(state);
+ }
+
+ fn stream(self: Box<Self>, input: BoxStream<E>) -> BoxStream<Self::Output> {
+ use futures::StreamExt;
+
+ let value = self.value;
+
+ Box::pin(
+ self.recipe
+ .stream(input)
+ .map(move |element| (value.clone(), element)),
+ )
}
}
diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs
index cfa36170..43222b5b 100644
--- a/futures/src/subscription/tracker.rs
+++ b/futures/src/subscription/tracker.rs
@@ -1,6 +1,6 @@
-use crate::Subscription;
+use crate::{BoxFuture, Subscription};
-use futures::{channel::mpsc, future::BoxFuture, sink::Sink};
+use futures::{channel::mpsc, sink::Sink};
use std::{collections::HashMap, marker::PhantomData};
/// A registry of subscription streams.
@@ -26,8 +26,6 @@ where
Event: 'static + Send + Clone,
{
/// Creates a new empty [`Tracker`].
- ///
- /// [`Tracker`]: struct.Tracker.html
pub fn new() -> Self {
Self {
subscriptions: HashMap::new(),
@@ -52,14 +50,12 @@ where
/// It returns a list of futures that need to be spawned to materialize
/// the [`Tracker`] changes.
///
- /// [`Tracker`]: struct.Tracker.html
- /// [`Subscription`]: struct.Subscription.html
- /// [`Recipe`]: trait.Recipe.html
+ /// [`Recipe`]: crate::subscription::Recipe
pub fn update<Message, Receiver>(
&mut self,
subscription: Subscription<Hasher, Event, Message>,
receiver: Receiver,
- ) -> Vec<BoxFuture<'static, ()>>
+ ) -> Vec<BoxFuture<()>>
where
Message: 'static + Send,
Receiver: 'static
@@ -70,7 +66,7 @@ where
{
use futures::{future::FutureExt, stream::StreamExt};
- let mut futures = Vec::new();
+ let mut futures: Vec<BoxFuture<()>> = Vec::new();
let recipes = subscription.recipes();
let mut alive = std::collections::HashSet::new();
@@ -115,7 +111,7 @@ where
},
);
- futures.push(future.boxed());
+ futures.push(Box::pin(future));
}
self.subscriptions.retain(|id, _| alive.contains(&id));
@@ -131,6 +127,8 @@ where
///
/// This method publishes the given event to all the subscription streams
/// currently open.
+ ///
+ /// [`Recipe::stream`]: crate::subscription::Recipe::stream
pub fn broadcast(&mut self, event: Event) {
self.subscriptions
.values_mut()
diff --git a/futures/src/time.rs b/futures/src/time.rs
new file mode 100644
index 00000000..5e9ea436
--- /dev/null
+++ b/futures/src/time.rs
@@ -0,0 +1,74 @@
+//! Listen and react to time.
+use crate::subscription::{self, Subscription};
+
+/// Returns a [`Subscription`] that produces messages at a set interval.
+///
+/// The first message is produced after a `duration`, and then continues to
+/// produce more messages every `duration` after that.
+pub fn every<H: std::hash::Hasher, E>(
+ duration: std::time::Duration,
+) -> Subscription<H, E, std::time::Instant> {
+ Subscription::from_recipe(Every(duration))
+}
+
+struct Every(std::time::Duration);
+
+#[cfg(feature = "async-std")]
+impl<H, E> subscription::Recipe<H, E> for Every
+where
+ H: std::hash::Hasher,
+{
+ type Output = std::time::Instant;
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+
+ std::any::TypeId::of::<Self>().hash(state);
+ self.0.hash(state);
+ }
+
+ fn stream(
+ self: Box<Self>,
+ _input: futures::stream::BoxStream<'static, E>,
+ ) -> futures::stream::BoxStream<'static, Self::Output> {
+ use futures::stream::StreamExt;
+
+ async_std::stream::interval(self.0)
+ .map(|_| std::time::Instant::now())
+ .boxed()
+ }
+}
+
+#[cfg(all(
+ any(feature = "tokio", feature = "tokio_old"),
+ not(feature = "async-std")
+))]
+impl<H, E> subscription::Recipe<H, E> for Every
+where
+ H: std::hash::Hasher,
+{
+ type Output = std::time::Instant;
+
+ fn hash(&self, state: &mut H) {
+ use std::hash::Hash;
+
+ std::any::TypeId::of::<Self>().hash(state);
+ self.0.hash(state);
+ }
+
+ fn stream(
+ self: Box<Self>,
+ _input: futures::stream::BoxStream<'static, E>,
+ ) -> futures::stream::BoxStream<'static, Self::Output> {
+ use futures::stream::StreamExt;
+
+ #[cfg(feature = "tokio_old")]
+ use tokio_old as tokio;
+
+ let start = tokio::time::Instant::now() + self.0;
+
+ tokio::time::interval_at(start, self.0)
+ .map(|_| std::time::Instant::now())
+ .boxed()
+ }
+}