diff options
Diffstat (limited to '')
| -rw-r--r-- | futures/src/backend/native/async_std.rs | 2 | ||||
| -rw-r--r-- | futures/src/backend/native/smol.rs | 2 | ||||
| -rw-r--r-- | futures/src/backend/native/tokio.rs | 2 | ||||
| -rw-r--r-- | futures/src/backend/wasm/wasm_bindgen.rs | 2 | ||||
| -rw-r--r-- | futures/src/lib.rs | 1 | ||||
| -rw-r--r-- | futures/src/stream.rs | 26 | ||||
| -rw-r--r-- | futures/src/subscription.rs | 349 | 
7 files changed, 206 insertions, 178 deletions
| diff --git a/futures/src/backend/native/async_std.rs b/futures/src/backend/native/async_std.rs index b7da5e90..86714f45 100644 --- a/futures/src/backend/native/async_std.rs +++ b/futures/src/backend/native/async_std.rs @@ -27,7 +27,7 @@ pub mod time {      pub fn every(          duration: std::time::Duration,      ) -> Subscription<std::time::Instant> { -        Subscription::from_recipe(Every(duration)) +        subscription::from_recipe(Every(duration))      }      #[derive(Debug)] diff --git a/futures/src/backend/native/smol.rs b/futures/src/backend/native/smol.rs index aaf1518c..8d448e7f 100644 --- a/futures/src/backend/native/smol.rs +++ b/futures/src/backend/native/smol.rs @@ -26,7 +26,7 @@ pub mod time {      pub fn every(          duration: std::time::Duration,      ) -> Subscription<std::time::Instant> { -        Subscription::from_recipe(Every(duration)) +        subscription::from_recipe(Every(duration))      }      #[derive(Debug)] diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index df91d798..9dc3593d 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -31,7 +31,7 @@ pub mod time {      pub fn every(          duration: std::time::Duration,      ) -> Subscription<std::time::Instant> { -        Subscription::from_recipe(Every(duration)) +        subscription::from_recipe(Every(duration))      }      #[derive(Debug)] diff --git a/futures/src/backend/wasm/wasm_bindgen.rs b/futures/src/backend/wasm/wasm_bindgen.rs index 3228dd18..f7846c01 100644 --- a/futures/src/backend/wasm/wasm_bindgen.rs +++ b/futures/src/backend/wasm/wasm_bindgen.rs @@ -26,7 +26,7 @@ pub mod time {      pub fn every(          duration: std::time::Duration,      ) -> Subscription<wasm_timer::Instant> { -        Subscription::from_recipe(Every(duration)) +        subscription::from_recipe(Every(duration))      }      #[derive(Debug)] diff --git a/futures/src/lib.rs b/futures/src/lib.rs index a874a618..31738823 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -15,6 +15,7 @@ pub mod backend;  pub mod event;  pub mod executor;  pub mod keyboard; +pub mod stream;  pub mod subscription;  pub use executor::Executor; diff --git a/futures/src/stream.rs b/futures/src/stream.rs new file mode 100644 index 00000000..2ec505f1 --- /dev/null +++ b/futures/src/stream.rs @@ -0,0 +1,26 @@ +//! Create asynchronous streams of data. +use futures::channel::mpsc; +use futures::never::Never; +use futures::stream::{self, Stream, StreamExt}; + +use std::future::Future; + +/// Creates a new [`Stream`] that produces the items sent from a [`Future`] +/// to the [`mpsc::Sender`] provided to the closure. +/// +/// This is a more ergonomic [`stream::unfold`], which allows you to go +/// from the "world of futures" to the "world of streams" by simply looping +/// and publishing to an async channel from inside a [`Future`]. +pub fn channel<T, F>( +    size: usize, +    f: impl FnOnce(mpsc::Sender<T>) -> F, +) -> impl Stream<Item = T> +where +    F: Future<Output = Never>, +{ +    let (sender, receiver) = mpsc::channel(size); + +    let runner = stream::once(f(sender)).map(|_| unreachable!()); + +    stream::select(receiver, runner) +} diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index 5ec39582..1a0d454d 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -5,11 +5,9 @@ pub use tracker::Tracker;  use crate::core::event;  use crate::core::window; -use crate::futures::{Future, Stream}; +use crate::futures::Stream;  use crate::{BoxStream, MaybeSend}; -use futures::channel::mpsc; -use futures::never::Never;  use std::any::TypeId;  use std::hash::Hash; @@ -61,20 +59,66 @@ pub type Hasher = rustc_hash::FxHasher;  /// A request to listen to external events.  /// -/// Besides performing async actions on demand with `Command`, most +/// Besides performing async actions on demand with `Task`, most  /// applications also need to listen to external events passively.  /// -/// A [`Subscription`] is normally provided to some runtime, like a `Command`, +/// A [`Subscription`] is normally provided to some runtime, like a `Task`,  /// and it will generate events as long as the user keeps requesting it.  ///  /// For instance, you can use a [`Subscription`] to listen to a `WebSocket`  /// connection, keyboard presses, mouse events, time ticks, etc. +/// +/// # The Lifetime of a [`Subscription`] +/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects +/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally +/// in the `subscription` function of an `application` or a `daemon`. +/// +/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will +/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying +/// [`Stream`] and executing it in an async runtime. +/// +/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way +/// to build a certain [`Stream`] together with some way to _identify_ it. +/// +/// Identification is important because when a specific [`Subscription`] stops being returned to the +/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a +/// [`Subscription`] to keep track of it. +/// +/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily +/// and whenever necessary. +/// +/// ``` +/// # mod iced { +/// #     pub mod time { +/// #         pub use iced_futures::backend::default::time::every; +/// #         pub use std::time::{Duration, Instant}; +/// #     } +/// # +/// #     pub use iced_futures::Subscription; +/// # } +/// use iced::time::{self, Duration, Instant}; +/// use iced::Subscription; +/// +/// struct State { +///     timer_enabled: bool, +/// } +/// +/// fn subscription(state: &State) -> Subscription<Instant> { +///     if state.timer_enabled { +///         time::every(Duration::from_secs(1)) +///     } else { +///         Subscription::none() +///     } +/// } +/// ``` +/// +/// [`Future`]: std::future::Future  #[must_use = "`Subscription` must be returned to runtime to take effect"] -pub struct Subscription<Message> { -    recipes: Vec<Box<dyn Recipe<Output = Message>>>, +pub struct Subscription<T> { +    recipes: Vec<Box<dyn Recipe<Output = T>>>,  } -impl<Message> Subscription<Message> { +impl<T> Subscription<T> {      /// Returns an empty [`Subscription`] that will not produce any output.      pub fn none() -> Self {          Self { @@ -82,19 +126,102 @@ impl<Message> Subscription<Message> {          }      } -    /// Creates a [`Subscription`] from a [`Recipe`] describing it. -    pub fn from_recipe( -        recipe: impl Recipe<Output = Message> + 'static, -    ) -> Self { -        Self { -            recipes: vec![Box::new(recipe)], -        } +    /// Returns a [`Subscription`] that will call the given function to create and +    /// asynchronously run the given [`Stream`]. +    /// +    /// # Creating an asynchronous worker with bidirectional communication +    /// You can leverage this helper to create a [`Subscription`] that spawns +    /// an asynchronous worker in the background and establish a channel of +    /// communication with an `iced` application. +    /// +    /// You can achieve this by creating an `mpsc` channel inside the closure +    /// and returning the `Sender` as a `Message` for the `Application`: +    /// +    /// ``` +    /// use iced_futures::subscription::{self, Subscription}; +    /// use iced_futures::stream; +    /// use iced_futures::futures::channel::mpsc; +    /// use iced_futures::futures::sink::SinkExt; +    /// use iced_futures::futures::Stream; +    /// +    /// pub enum Event { +    ///     Ready(mpsc::Sender<Input>), +    ///     WorkFinished, +    ///     // ... +    /// } +    /// +    /// enum Input { +    ///     DoSomeWork, +    ///     // ... +    /// } +    /// +    /// fn some_worker() -> impl Stream<Item = Event> { +    ///     stream::channel(100, |mut output| async move { +    ///         // Create channel +    ///         let (sender, mut receiver) = mpsc::channel(100); +    /// +    ///         // Send the sender back to the application +    ///         output.send(Event::Ready(sender)).await; +    /// +    ///         loop { +    ///             use iced_futures::futures::StreamExt; +    /// +    ///             // Read next input sent from `Application` +    ///             let input = receiver.select_next_some().await; +    /// +    ///             match input { +    ///                 Input::DoSomeWork => { +    ///                     // Do some async work... +    /// +    ///                     // Finally, we can optionally produce a message to tell the +    ///                     // `Application` the work is done +    ///                     output.send(Event::WorkFinished).await; +    ///                 } +    ///             } +    ///         } +    ///     }) +    /// } +    /// +    /// fn subscription() -> Subscription<Event> { +    ///     Subscription::run(some_worker) +    /// } +    /// ``` +    /// +    /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket` +    /// connection open. +    /// +    /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.12/examples/websocket +    pub fn run<S>(builder: fn() -> S) -> Self +    where +        S: Stream<Item = T> + MaybeSend + 'static, +        T: 'static, +    { +        from_recipe(Runner { +            id: builder, +            spawn: move |_| builder(), +        }) +    } + +    /// Returns a [`Subscription`] that will create and asynchronously run the +    /// given [`Stream`]. +    /// +    /// The `id` will be used to uniquely identify the [`Subscription`]. +    pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T> +    where +        I: Hash + 'static, +        S: Stream<Item = T> + MaybeSend + 'static, +        T: 'static, +    { +        from_recipe(Runner { +            id, +            spawn: move |_| stream, +        })      }      /// Batches all the provided subscriptions and returns the resulting      /// [`Subscription`].      pub fn batch( -        subscriptions: impl IntoIterator<Item = Subscription<Message>>, +        subscriptions: impl IntoIterator<Item = Subscription<T>>,      ) -> Self {          Self {              recipes: subscriptions @@ -104,18 +231,13 @@ impl<Message> Subscription<Message> {          }      } -    /// Returns the different recipes of the [`Subscription`]. -    pub fn into_recipes(self) -> Vec<Box<dyn Recipe<Output = Message>>> { -        self.recipes -    } -      /// Adds a value to the [`Subscription`] context.      ///      /// The value will be part of the identity of a [`Subscription`]. -    pub fn with<T>(mut self, value: T) -> Subscription<(T, Message)> +    pub fn with<A>(mut self, value: A) -> Subscription<(A, T)>      where -        Message: 'static, -        T: std::hash::Hash + Clone + Send + Sync + 'static, +        T: 'static, +        A: std::hash::Hash + Clone + Send + Sync + 'static,      {          Subscription {              recipes: self @@ -123,7 +245,7 @@ impl<Message> Subscription<Message> {                  .drain(..)                  .map(|recipe| {                      Box::new(With::new(recipe, value.clone())) -                        as Box<dyn Recipe<Output = (T, Message)>> +                        as Box<dyn Recipe<Output = (A, T)>>                  })                  .collect(),          } @@ -136,8 +258,8 @@ impl<Message> Subscription<Message> {      /// will panic in debug mode otherwise.      pub fn map<F, A>(mut self, f: F) -> Subscription<A>      where -        Message: 'static, -        F: Fn(Message) -> A + MaybeSend + Clone + 'static, +        T: 'static, +        F: Fn(T) -> A + MaybeSend + Clone + 'static,          A: 'static,      {          debug_assert!( @@ -159,7 +281,23 @@ impl<Message> Subscription<Message> {      }  } -impl<Message> std::fmt::Debug for Subscription<Message> { +/// Creates a [`Subscription`] from a [`Recipe`] describing it. +pub fn from_recipe<T>( +    recipe: impl Recipe<Output = T> + 'static, +) -> Subscription<T> { +    Subscription { +        recipes: vec![Box::new(recipe)], +    } +} + +/// Returns the different recipes of the [`Subscription`]. +pub fn into_recipes<T>( +    subscription: Subscription<T>, +) -> Vec<Box<dyn Recipe<Output = T>>> { +    subscription.recipes +} + +impl<T> std::fmt::Debug for Subscription<T> {      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {          f.debug_struct("Subscription").finish()      } @@ -273,65 +411,13 @@ where      }  } -/// Returns a [`Subscription`] that will call the given function to create and -/// asynchronously run the given [`Stream`]. -pub fn run<S, Message>(builder: fn() -> S) -> Subscription<Message> -where -    S: Stream<Item = Message> + MaybeSend + 'static, -    Message: 'static, -{ -    Subscription::from_recipe(Runner { -        id: builder, -        spawn: move |_| builder(), -    }) -} - -/// Returns a [`Subscription`] that will create and asynchronously run the -/// given [`Stream`]. -/// -/// The `id` will be used to uniquely identify the [`Subscription`]. -pub fn run_with_id<I, S, Message>(id: I, stream: S) -> Subscription<Message> -where -    I: Hash + 'static, -    S: Stream<Item = Message> + MaybeSend + 'static, -    Message: 'static, -{ -    Subscription::from_recipe(Runner { -        id, -        spawn: move |_| stream, -    }) -} - -/// Returns a [`Subscription`] that will create and asynchronously run a -/// [`Stream`] that will call the provided closure to produce every `Message`. -/// -/// The `id` will be used to uniquely identify the [`Subscription`]. -pub fn unfold<I, T, Fut, Message>( -    id: I, -    initial: T, -    mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, -) -> Subscription<Message> -where -    I: Hash + 'static, -    T: MaybeSend + 'static, -    Fut: Future<Output = (Message, T)> + MaybeSend + 'static, -    Message: 'static + MaybeSend, -{ -    use futures::future::FutureExt; - -    run_with_id( -        id, -        futures::stream::unfold(initial, move |state| f(state).map(Some)), -    ) -} - -pub(crate) fn filter_map<I, F, Message>(id: I, f: F) -> Subscription<Message> +pub(crate) fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>  where      I: Hash + 'static, -    F: Fn(Event) -> Option<Message> + MaybeSend + 'static, -    Message: 'static + MaybeSend, +    F: Fn(Event) -> Option<T> + MaybeSend + 'static, +    T: 'static + MaybeSend,  { -    Subscription::from_recipe(Runner { +    from_recipe(Runner {          id,          spawn: |events| {              use futures::future; @@ -342,107 +428,22 @@ where      })  } -/// Creates a [`Subscription`] that publishes the events sent from a [`Future`] -/// to an [`mpsc::Sender`] with the given bounds. -/// -/// # Creating an asynchronous worker with bidirectional communication -/// You can leverage this helper to create a [`Subscription`] that spawns -/// an asynchronous worker in the background and establish a channel of -/// communication with an `iced` application. -/// -/// You can achieve this by creating an `mpsc` channel inside the closure -/// and returning the `Sender` as a `Message` for the `Application`: -/// -/// ``` -/// use iced_futures::subscription::{self, Subscription}; -/// use iced_futures::futures::channel::mpsc; -/// use iced_futures::futures::sink::SinkExt; -/// -/// pub enum Event { -///     Ready(mpsc::Sender<Input>), -///     WorkFinished, -///     // ... -/// } -/// -/// enum Input { -///     DoSomeWork, -///     // ... -/// } -/// -/// fn some_worker() -> Subscription<Event> { -///     struct SomeWorker; -/// -///     subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move { -///         // Create channel -///         let (sender, mut receiver) = mpsc::channel(100); -/// -///         // Send the sender back to the application -///         output.send(Event::Ready(sender)).await; -/// -///         loop { -///             use iced_futures::futures::StreamExt; -/// -///             // Read next input sent from `Application` -///             let input = receiver.select_next_some().await; -/// -///             match input { -///                 Input::DoSomeWork => { -///                     // Do some async work... -/// -///                     // Finally, we can optionally produce a message to tell the -///                     // `Application` the work is done -///                     output.send(Event::WorkFinished).await; -///                 } -///             } -///         } -///     }) -/// } -/// ``` -/// -/// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket` -/// connection open. -/// -/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.12/examples/websocket -pub fn channel<I, Fut, Message>( -    id: I, -    size: usize, -    f: impl FnOnce(mpsc::Sender<Message>) -> Fut + MaybeSend + 'static, -) -> Subscription<Message> -where -    I: Hash + 'static, -    Fut: Future<Output = Never> + MaybeSend + 'static, -    Message: 'static + MaybeSend, -{ -    use futures::stream::{self, StreamExt}; - -    Subscription::from_recipe(Runner { -        id, -        spawn: move |_| { -            let (sender, receiver) = mpsc::channel(size); - -            let runner = stream::once(f(sender)).map(|_| unreachable!()); - -            stream::select(receiver, runner) -        }, -    }) -} - -struct Runner<I, F, S, Message> +struct Runner<I, F, S, T>  where      F: FnOnce(EventStream) -> S, -    S: Stream<Item = Message>, +    S: Stream<Item = T>,  {      id: I,      spawn: F,  } -impl<I, S, F, Message> Recipe for Runner<I, F, S, Message> +impl<I, F, S, T> Recipe for Runner<I, F, S, T>  where      I: Hash + 'static,      F: FnOnce(EventStream) -> S, -    S: Stream<Item = Message> + MaybeSend + 'static, +    S: Stream<Item = T> + MaybeSend + 'static,  { -    type Output = Message; +    type Output = T;      fn hash(&self, state: &mut Hasher) {          std::any::TypeId::of::<I>().hash(state); | 
