From f4cf488e0b083b5d7b7612c650917233163ee9cb Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 5 Mar 2023 04:15:10 +0100 Subject: Remove generic `Hasher` and `Event` from `subscription::Recipe` --- futures/Cargo.toml | 4 + futures/src/backend/native/async_std.rs | 14 +- futures/src/backend/native/smol.rs | 14 +- futures/src/backend/native/tokio.rs | 14 +- futures/src/lib.rs | 1 + futures/src/runtime.rs | 25 +-- futures/src/subscription.rs | 316 ++++++++++++++++++++++++++------ futures/src/subscription/tracker.rs | 51 ++---- 8 files changed, 314 insertions(+), 125 deletions(-) (limited to 'futures') diff --git a/futures/Cargo.toml b/futures/Cargo.toml index e4d355ee..411e7c2a 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -16,6 +16,10 @@ thread-pool = ["futures/thread-pool"] [dependencies] log = "0.4" +[dependencies.iced_core] +version = "0.8" +path = "../core" + [dependencies.futures] version = "0.3" diff --git a/futures/src/backend/native/async_std.rs b/futures/src/backend/native/async_std.rs index b324dbf1..52b0e914 100644 --- a/futures/src/backend/native/async_std.rs +++ b/futures/src/backend/native/async_std.rs @@ -18,28 +18,26 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. + use crate::core::Hasher; use crate::subscription::{self, Subscription}; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every( + pub fn every( duration: std::time::Duration, - ) -> Subscription { + ) -> Subscription { Subscription::from_recipe(Every(duration)) } #[derive(Debug)] struct Every(std::time::Duration); - impl subscription::Recipe for Every - where - H: std::hash::Hasher, - { + impl subscription::Recipe for Every { type Output = std::time::Instant; - fn hash(&self, state: &mut H) { + fn hash(&self, state: &mut Hasher) { use std::hash::Hash; std::any::TypeId::of::().hash(state); @@ -48,7 +46,7 @@ pub mod time { fn stream( self: Box, - _input: futures::stream::BoxStream<'static, E>, + _input: subscription::EventStream, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::stream::StreamExt; diff --git a/futures/src/backend/native/smol.rs b/futures/src/backend/native/smol.rs index d5201cde..30bc8291 100644 --- a/futures/src/backend/native/smol.rs +++ b/futures/src/backend/native/smol.rs @@ -19,28 +19,26 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. + use crate::core::Hasher; use crate::subscription::{self, Subscription}; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every( + pub fn every( duration: std::time::Duration, - ) -> Subscription { + ) -> Subscription { Subscription::from_recipe(Every(duration)) } #[derive(Debug)] struct Every(std::time::Duration); - impl subscription::Recipe for Every - where - H: std::hash::Hasher, - { + impl subscription::Recipe for Every { type Output = std::time::Instant; - fn hash(&self, state: &mut H) { + fn hash(&self, state: &mut Hasher) { use std::hash::Hash; std::any::TypeId::of::().hash(state); @@ -49,7 +47,7 @@ pub mod time { fn stream( self: Box, - _input: futures::stream::BoxStream<'static, E>, + _input: subscription::EventStream, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::stream::StreamExt; diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index dd818bd1..4698a105 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -22,28 +22,26 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. + use crate::core::Hasher; use crate::subscription::{self, Subscription}; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. - pub fn every( + pub fn every( duration: std::time::Duration, - ) -> Subscription { + ) -> Subscription { Subscription::from_recipe(Every(duration)) } #[derive(Debug)] struct Every(std::time::Duration); - impl subscription::Recipe for Every - where - H: std::hash::Hasher, - { + impl subscription::Recipe for Every { type Output = std::time::Instant; - fn hash(&self, state: &mut H) { + fn hash(&self, state: &mut Hasher) { use std::hash::Hash; std::any::TypeId::of::().hash(state); @@ -52,7 +50,7 @@ pub mod time { fn stream( self: Box, - _input: futures::stream::BoxStream<'static, E>, + _input: subscription::EventStream, ) -> futures::stream::BoxStream<'static, Self::Output> { use futures::stream::StreamExt; diff --git a/futures/src/lib.rs b/futures/src/lib.rs index c0982db7..39137de2 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -18,6 +18,7 @@ #![allow(clippy::inherent_to_string, clippy::type_complexity)] #![cfg_attr(docsrs, feature(doc_cfg))] pub use futures; +pub use iced_core as core; mod command; mod maybe_send; diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index 24f9f241..2241a494 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -1,6 +1,7 @@ //! Run commands and keep track of subscriptions. +use crate::core::event::{self, Event}; use crate::subscription; -use crate::{BoxFuture, Executor, MaybeSend, Subscription}; +use crate::{BoxFuture, Executor, MaybeSend}; use futures::{channel::mpsc, Sink}; use std::marker::PhantomData; @@ -12,18 +13,15 @@ use std::marker::PhantomData; /// /// [`Command`]: crate::Command #[derive(Debug)] -pub struct Runtime { +pub struct Runtime { executor: Executor, sender: Sender, - subscriptions: subscription::Tracker, + subscriptions: subscription::Tracker, _message: PhantomData, } -impl - Runtime +impl Runtime where - Hasher: std::hash::Hasher + Default, - Event: Send + Clone + 'static, Executor: self::Executor, Sender: Sink + Unpin @@ -79,7 +77,9 @@ where /// [`Tracker::update`]: subscription::Tracker::update pub fn track( &mut self, - subscription: Subscription, + recipes: impl IntoIterator< + Item = Box>, + >, ) { let Runtime { executor, @@ -88,8 +88,9 @@ where .. } = self; - let futures = executor - .enter(|| subscriptions.update(subscription, sender.clone())); + let futures = executor.enter(|| { + subscriptions.update(recipes.into_iter(), sender.clone()) + }); for future in futures { executor.spawn(future); @@ -102,7 +103,7 @@ where /// See [`Tracker::broadcast`] to learn more. /// /// [`Tracker::broadcast`]: subscription::Tracker::broadcast - pub fn broadcast(&mut self, event: Event) { - self.subscriptions.broadcast(event); + pub fn broadcast(&mut self, event: Event, status: event::Status) { + self.subscriptions.broadcast(event, status); } } diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index fe53fd7e..876f29c2 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -3,7 +3,18 @@ mod tracker; pub use tracker::Tracker; -use crate::BoxStream; +use crate::core::event::{self, Event}; +use crate::core::window; +use crate::core::Hasher; +use crate::futures::{Future, Stream}; +use crate::{BoxStream, MaybeSend}; + +use std::hash::Hash; + +/// A stream of runtime events. +/// +/// It is the input of a [`Subscription`]. +pub type EventStream = BoxStream<(Event, event::Status)>; /// A request to listen to external events. /// @@ -16,19 +27,13 @@ use crate::BoxStream; /// For instance, you can use a [`Subscription`] to listen to a WebSocket /// connection, keyboard presses, mouse events, time ticks, etc. /// -/// This type is normally aliased by runtimes with a specific `Event` and/or -/// `Hasher`. -/// /// [`Command`]: crate::Command #[must_use = "`Subscription` must be returned to runtime to take effect"] -pub struct Subscription { - recipes: Vec>>, +pub struct Subscription { + recipes: Vec>>, } -impl Subscription -where - H: std::hash::Hasher, -{ +impl Subscription { /// Returns an empty [`Subscription`] that will not produce any output. pub fn none() -> Self { Self { @@ -38,7 +43,7 @@ where /// Creates a [`Subscription`] from a [`Recipe`] describing it. pub fn from_recipe( - recipe: impl Recipe + 'static, + recipe: impl Recipe + 'static, ) -> Self { Self { recipes: vec![Box::new(recipe)], @@ -48,7 +53,7 @@ where /// Batches all the provided subscriptions and returns the resulting /// [`Subscription`]. pub fn batch( - subscriptions: impl IntoIterator>, + subscriptions: impl IntoIterator>, ) -> Self { Self { recipes: subscriptions @@ -59,18 +64,16 @@ where } /// Returns the different recipes of the [`Subscription`]. - pub fn recipes(self) -> Vec>> { + pub fn into_recipes(self) -> Vec>> { self.recipes } /// Adds a value to the [`Subscription`] context. /// /// The value will be part of the identity of a [`Subscription`]. - pub fn with(mut self, value: T) -> Subscription + pub fn with(mut self, value: T) -> Subscription<(T, Message)> where - H: 'static, - E: 'static, - O: 'static, + Message: 'static, T: std::hash::Hash + Clone + Send + Sync + 'static, { Subscription { @@ -79,18 +82,16 @@ where .drain(..) .map(|recipe| { Box::new(With::new(recipe, value.clone())) - as Box> + as Box> }) .collect(), } } /// Transforms the [`Subscription`] output with the given function. - pub fn map(mut self, f: fn(O) -> A) -> Subscription + pub fn map(mut self, f: fn(Message) -> A) -> Subscription where - H: 'static, - E: 'static, - O: 'static, + Message: 'static, A: 'static, { Subscription { @@ -98,15 +99,14 @@ where .recipes .drain(..) .map(|recipe| { - Box::new(Map::new(recipe, f)) - as Box> + Box::new(Map::new(recipe, f)) as Box> }) .collect(), } } } -impl std::fmt::Debug for Subscription { +impl std::fmt::Debug for Subscription { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Subscription").finish() } @@ -129,7 +129,7 @@ impl std::fmt::Debug for Subscription { /// [examples]: https://github.com/iced-rs/iced/tree/0.8/examples /// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.8/examples/download_progress /// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.8/examples/stopwatch -pub trait Recipe { +pub trait Recipe { /// The events that will be produced by a [`Subscription`] with this /// [`Recipe`]. type Output; @@ -141,45 +141,33 @@ pub trait Recipe { /// Executes the [`Recipe`] and produces the stream of events of its /// [`Subscription`]. - /// - /// It receives some stream of generic events, which is normally defined by - /// shells. - fn stream( - self: Box, - input: BoxStream, - ) -> BoxStream; + fn stream(self: Box, input: EventStream) -> BoxStream; } -struct Map { - recipe: Box>, +struct Map { + recipe: Box>, mapper: fn(A) -> B, } -impl Map { - fn new( - recipe: Box>, - mapper: fn(A) -> B, - ) -> Self { +impl Map { + fn new(recipe: Box>, mapper: fn(A) -> B) -> Self { Map { recipe, mapper } } } -impl Recipe for Map +impl Recipe for Map where A: 'static, B: 'static, - H: std::hash::Hasher, { type Output = B; - fn hash(&self, state: &mut H) { - use std::hash::Hash; - + fn hash(&self, state: &mut Hasher) { self.recipe.hash(state); self.mapper.hash(state); } - fn stream(self: Box, input: BoxStream) -> BoxStream { + fn stream(self: Box, input: EventStream) -> BoxStream { use futures::StreamExt; let mapper = self.mapper; @@ -188,34 +176,31 @@ where } } -struct With { - recipe: Box>, +struct With { + recipe: Box>, value: B, } -impl With { - fn new(recipe: Box>, value: B) -> Self { +impl With { + fn new(recipe: Box>, value: B) -> Self { With { recipe, value } } } -impl Recipe for With +impl Recipe for With where A: 'static, B: 'static + std::hash::Hash + Clone + Send + Sync, - H: std::hash::Hasher, { type Output = (B, A); - fn hash(&self, state: &mut H) { - use std::hash::Hash; - + fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::().hash(state); self.value.hash(state); self.recipe.hash(state); } - fn stream(self: Box, input: BoxStream) -> BoxStream { + fn stream(self: Box, input: EventStream) -> BoxStream { use futures::StreamExt; let value = self.value; @@ -227,3 +212,222 @@ where ) } } + +/// Returns a [`Subscription`] to all the ignored runtime events. +/// +/// This subscription will notify your application of any [`Event`] that was +/// not captured by any widget. +pub fn events() -> Subscription { + events_with(|event, status| match status { + event::Status::Ignored => Some(event), + event::Status::Captured => None, + }) +} + +/// Returns a [`Subscription`] that filters all the runtime events with the +/// provided function, producing messages accordingly. +/// +/// This subscription will call the provided function for every [`Event`] +/// handled by the runtime. If the function: +/// +/// - Returns `None`, the [`Event`] will be discarded. +/// - Returns `Some` message, the `Message` will be produced. +pub fn events_with( + f: fn(Event, event::Status) -> Option, +) -> Subscription +where + Message: 'static + MaybeSend, +{ + #[derive(Hash)] + struct EventsWith; + + Subscription::from_recipe(Runner { + id: (EventsWith, f), + spawn: move |events| { + use futures::future; + use futures::stream::StreamExt; + + events.filter_map(move |(event, status)| { + future::ready(match event { + Event::Window(window::Event::RedrawRequested(_)) => None, + _ => f(event, status), + }) + }) + }, + }) +} + +/// Returns a [`Subscription`] that produces a message for every runtime event, +/// including the redraw request events. +/// +/// **Warning:** This [`Subscription`], if unfiltered, may produce messages in +/// an infinite loop. +pub fn raw_events( + f: fn(Event, event::Status) -> Option, +) -> Subscription +where + Message: 'static + MaybeSend, +{ + #[derive(Hash)] + struct RawEvents; + + Subscription::from_recipe(Runner { + id: (RawEvents, f), + spawn: move |events| { + use futures::future; + use futures::stream::StreamExt; + + events.filter_map(move |(event, status)| { + future::ready(f(event, status)) + }) + }, + }) +} + +/// Returns a [`Subscription`] that will call the given function to create and +/// asynchronously run the given [`Stream`]. +pub fn run(builder: fn() -> S) -> Subscription +where + S: Stream + MaybeSend + 'static, + Message: 'static, +{ + Subscription::from_recipe(Runner { + id: builder, + spawn: move |_| builder(), + }) +} + +/// Returns a [`Subscription`] that will create and asynchronously run the +/// given [`Stream`]. +/// +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn run_with_id(id: I, stream: S) -> Subscription +where + I: Hash + 'static, + S: Stream + MaybeSend + 'static, + Message: 'static, +{ + Subscription::from_recipe(Runner { + id, + spawn: move |_| stream, + }) +} + +/// Returns a [`Subscription`] that will create and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// The `id` will be used to uniquely identify the [`Subscription`]. +/// +/// # Creating an asynchronous worker with bidirectional communication +/// You can leverage this helper to create a [`Subscription`] that spawns +/// an asynchronous worker in the background and establish a channel of +/// communication with an `iced` application. +/// +/// You can achieve this by creating an `mpsc` channel inside the closure +/// and returning the `Sender` as a `Message` for the `Application`: +/// +/// ``` +/// use iced_futures::subscription::{self, Subscription}; +/// use iced_futures::futures; +/// +/// use futures::channel::mpsc; +/// +/// pub enum Event { +/// Ready(mpsc::Sender), +/// WorkFinished, +/// // ... +/// } +/// +/// enum Input { +/// DoSomeWork, +/// // ... +/// } +/// +/// enum State { +/// Starting, +/// Ready(mpsc::Receiver), +/// } +/// +/// fn some_worker() -> Subscription { +/// struct SomeWorker; +/// +/// subscription::unfold(std::any::TypeId::of::(), State::Starting, |state| async move { +/// match state { +/// State::Starting => { +/// // Create channel +/// let (sender, receiver) = mpsc::channel(100); +/// +/// (Some(Event::Ready(sender)), State::Ready(receiver)) +/// } +/// State::Ready(mut receiver) => { +/// use futures::StreamExt; +/// +/// // Read next input sent from `Application` +/// let input = receiver.select_next_some().await; +/// +/// match input { +/// Input::DoSomeWork => { +/// // Do some async work... +/// +/// // Finally, we can optionally return a message to tell the +/// // `Application` the work is done +/// (Some(Event::WorkFinished), State::Ready(receiver)) +/// } +/// } +/// } +/// } +/// }) +/// } +/// ``` +/// +/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket +/// connection open. +/// +/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.8/examples/websocket +pub fn unfold( + id: I, + initial: T, + mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, +) -> Subscription +where + I: Hash + 'static, + T: MaybeSend + 'static, + Fut: Future, T)> + MaybeSend + 'static, + Message: 'static + MaybeSend, +{ + use futures::future::{self, FutureExt}; + use futures::stream::StreamExt; + + run_with_id( + id, + futures::stream::unfold(initial, move |state| f(state).map(Some)) + .filter_map(future::ready), + ) +} + +struct Runner +where + F: FnOnce(EventStream) -> S, + S: Stream, +{ + id: I, + spawn: F, +} + +impl Recipe for Runner +where + I: Hash + 'static, + F: FnOnce(EventStream) -> S, + S: Stream + MaybeSend + 'static, +{ + type Output = Message; + + fn hash(&self, state: &mut Hasher) { + std::any::TypeId::of::().hash(state); + self.id.hash(state); + } + + fn stream(self: Box, input: EventStream) -> BoxStream { + crate::boxed_stream((self.spawn)(input)) + } +} diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index 9fe110b0..ae71cd25 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -1,38 +1,35 @@ -use crate::{BoxFuture, MaybeSend, Subscription}; +use crate::core::event::{self, Event}; +use crate::core::Hasher; +use crate::subscription::Recipe; +use crate::{BoxFuture, MaybeSend}; -use futures::{ - channel::mpsc, - sink::{Sink, SinkExt}, -}; -use std::{collections::HashMap, marker::PhantomData}; +use futures::channel::mpsc; +use futures::sink::{Sink, SinkExt}; + +use std::collections::HashMap; +use std::hash::Hasher as _; /// A registry of subscription streams. /// /// If you have an application that continuously returns a [`Subscription`], /// you can use a [`Tracker`] to keep track of the different recipes and keep /// its executions alive. -#[derive(Debug)] -pub struct Tracker { - subscriptions: HashMap>, - _hasher: PhantomData, +#[derive(Debug, Default)] +pub struct Tracker { + subscriptions: HashMap, } #[derive(Debug)] -pub struct Execution { +pub struct Execution { _cancel: futures::channel::oneshot::Sender<()>, - listener: Option>, + listener: Option>, } -impl Tracker -where - Hasher: std::hash::Hasher + Default, - Event: 'static + Send + Clone, -{ +impl Tracker { /// Creates a new empty [`Tracker`]. pub fn new() -> Self { Self { subscriptions: HashMap::new(), - _hasher: PhantomData, } } @@ -56,7 +53,7 @@ where /// [`Recipe`]: crate::subscription::Recipe pub fn update( &mut self, - subscription: Subscription, + recipes: impl Iterator>>, receiver: Receiver, ) -> Vec> where @@ -70,8 +67,6 @@ where use futures::stream::StreamExt; let mut futures: Vec> = Vec::new(); - - let recipes = subscription.recipes(); let mut alive = std::collections::HashSet::new(); for recipe in recipes { @@ -142,12 +137,12 @@ where /// currently open. /// /// [`Recipe::stream`]: crate::subscription::Recipe::stream - pub fn broadcast(&mut self, event: Event) { + pub fn broadcast(&mut self, event: Event, status: event::Status) { self.subscriptions .values_mut() .filter_map(|connection| connection.listener.as_mut()) .for_each(|listener| { - if let Err(error) = listener.try_send(event.clone()) { + if let Err(error) = listener.try_send((event.clone(), status)) { log::warn!( "Error sending event to subscription: {:?}", error @@ -156,13 +151,3 @@ where }); } } - -impl Default for Tracker -where - Hasher: std::hash::Hasher + Default, - Event: 'static + Send + Clone, -{ - fn default() -> Self { - Self::new() - } -} -- cgit