diff options
author | 2024-07-05 02:15:13 +0200 | |
---|---|---|
committer | 2024-07-05 02:16:45 +0200 | |
commit | 8bc49cd88653309f5abe8a38d5a4af36fcfea933 (patch) | |
tree | 6c205ff9964cec1d48934b7ff166532491a349c5 | |
parent | e50aa03edc858d561992d8ca441aa063f273eeac (diff) | |
download | iced-8bc49cd88653309f5abe8a38d5a4af36fcfea933.tar.gz iced-8bc49cd88653309f5abe8a38d5a4af36fcfea933.tar.bz2 iced-8bc49cd88653309f5abe8a38d5a4af36fcfea933.zip |
Hide `Subscription` internals
.. and introduce `stream::channel` helper
-rw-r--r-- | examples/download_progress/src/download.rs | 14 | ||||
-rw-r--r-- | examples/websocket/src/echo.rs | 102 | ||||
-rw-r--r-- | examples/websocket/src/main.rs | 2 | ||||
-rw-r--r-- | futures/src/backend/native/async_std.rs | 2 | ||||
-rw-r--r-- | futures/src/backend/native/smol.rs | 2 | ||||
-rw-r--r-- | futures/src/backend/native/tokio.rs | 2 | ||||
-rw-r--r-- | futures/src/lib.rs | 1 | ||||
-rw-r--r-- | futures/src/stream.rs | 26 | ||||
-rw-r--r-- | futures/src/subscription.rs | 349 | ||||
-rw-r--r-- | src/lib.rs | 10 | ||||
-rw-r--r-- | winit/src/program.rs | 6 |
11 files changed, 269 insertions, 247 deletions
diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs index d6cc1e24..bdf57290 100644 --- a/examples/download_progress/src/download.rs +++ b/examples/download_progress/src/download.rs @@ -1,4 +1,5 @@ -use iced::subscription; +use iced::futures; +use iced::Subscription; use std::hash::Hash; @@ -7,9 +8,14 @@ pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>( id: I, url: T, ) -> iced::Subscription<(I, Progress)> { - subscription::unfold(id, State::Ready(url.to_string()), move |state| { - download(id, state) - }) + Subscription::run_with_id( + id, + futures::stream::unfold(State::Ready(url.to_string()), move |state| { + use iced::futures::FutureExt; + + download(id, state).map(Some) + }), + ) } async fn download<I: Copy>(id: I, state: State) -> ((I, Progress), State) { diff --git a/examples/websocket/src/echo.rs b/examples/websocket/src/echo.rs index cd32cb66..14652936 100644 --- a/examples/websocket/src/echo.rs +++ b/examples/websocket/src/echo.rs @@ -1,87 +1,79 @@ pub mod server; use iced::futures; -use iced::subscription::{self, Subscription}; +use iced::stream; use iced::widget::text; use futures::channel::mpsc; use futures::sink::SinkExt; -use futures::stream::StreamExt; +use futures::stream::{Stream, StreamExt}; use async_tungstenite::tungstenite; use std::fmt; -pub fn connect() -> Subscription<Event> { - struct Connect; +pub fn connect() -> impl Stream<Item = Event> { + stream::channel(100, |mut output| async move { + let mut state = State::Disconnected; - subscription::channel( - std::any::TypeId::of::<Connect>(), - 100, - |mut output| async move { - let mut state = State::Disconnected; + loop { + match &mut state { + State::Disconnected => { + const ECHO_SERVER: &str = "ws://127.0.0.1:3030"; - loop { - match &mut state { - State::Disconnected => { - const ECHO_SERVER: &str = "ws://127.0.0.1:3030"; - - match async_tungstenite::tokio::connect_async( - ECHO_SERVER, - ) + match async_tungstenite::tokio::connect_async(ECHO_SERVER) .await - { - Ok((websocket, _)) => { - let (sender, receiver) = mpsc::channel(100); - - let _ = output - .send(Event::Connected(Connection(sender))) - .await; + { + Ok((websocket, _)) => { + let (sender, receiver) = mpsc::channel(100); - state = State::Connected(websocket, receiver); - } - Err(_) => { - tokio::time::sleep( - tokio::time::Duration::from_secs(1), - ) + let _ = output + .send(Event::Connected(Connection(sender))) .await; - let _ = output.send(Event::Disconnected).await; - } + state = State::Connected(websocket, receiver); + } + Err(_) => { + tokio::time::sleep( + tokio::time::Duration::from_secs(1), + ) + .await; + + let _ = output.send(Event::Disconnected).await; } } - State::Connected(websocket, input) => { - let mut fused_websocket = websocket.by_ref().fuse(); - - futures::select! { - received = fused_websocket.select_next_some() => { - match received { - Ok(tungstenite::Message::Text(message)) => { - let _ = output.send(Event::MessageReceived(Message::User(message))).await; - } - Err(_) => { - let _ = output.send(Event::Disconnected).await; - - state = State::Disconnected; - } - Ok(_) => continue, + } + State::Connected(websocket, input) => { + let mut fused_websocket = websocket.by_ref().fuse(); + + futures::select! { + received = fused_websocket.select_next_some() => { + match received { + Ok(tungstenite::Message::Text(message)) => { + let _ = output.send(Event::MessageReceived(Message::User(message))).await; } - } - - message = input.select_next_some() => { - let result = websocket.send(tungstenite::Message::Text(message.to_string())).await; - - if result.is_err() { + Err(_) => { let _ = output.send(Event::Disconnected).await; state = State::Disconnected; } + Ok(_) => continue, + } + } + + message = input.select_next_some() => { + let result = websocket.send(tungstenite::Message::Text(message.to_string())).await; + + if result.is_err() { + let _ = output.send(Event::Disconnected).await; + + state = State::Disconnected; } } } } } - }, - ) + } + }) } #[derive(Debug)] diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs index 8422ce16..95a14fd9 100644 --- a/examples/websocket/src/main.rs +++ b/examples/websocket/src/main.rs @@ -83,7 +83,7 @@ impl WebSocket { } fn subscription(&self) -> Subscription<Message> { - echo::connect().map(Message::Echo) + Subscription::run(echo::connect).map(Message::Echo) } fn view(&self) -> Element<Message> { diff --git a/futures/src/backend/native/async_std.rs b/futures/src/backend/native/async_std.rs index b7da5e90..86714f45 100644 --- a/futures/src/backend/native/async_std.rs +++ b/futures/src/backend/native/async_std.rs @@ -27,7 +27,7 @@ pub mod time { pub fn every( duration: std::time::Duration, ) -> Subscription<std::time::Instant> { - Subscription::from_recipe(Every(duration)) + subscription::from_recipe(Every(duration)) } #[derive(Debug)] diff --git a/futures/src/backend/native/smol.rs b/futures/src/backend/native/smol.rs index aaf1518c..8d448e7f 100644 --- a/futures/src/backend/native/smol.rs +++ b/futures/src/backend/native/smol.rs @@ -26,7 +26,7 @@ pub mod time { pub fn every( duration: std::time::Duration, ) -> Subscription<std::time::Instant> { - Subscription::from_recipe(Every(duration)) + subscription::from_recipe(Every(duration)) } #[derive(Debug)] diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index df91d798..9dc3593d 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -31,7 +31,7 @@ pub mod time { pub fn every( duration: std::time::Duration, ) -> Subscription<std::time::Instant> { - Subscription::from_recipe(Every(duration)) + subscription::from_recipe(Every(duration)) } #[derive(Debug)] diff --git a/futures/src/lib.rs b/futures/src/lib.rs index a874a618..31738823 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -15,6 +15,7 @@ pub mod backend; pub mod event; pub mod executor; pub mod keyboard; +pub mod stream; pub mod subscription; pub use executor::Executor; diff --git a/futures/src/stream.rs b/futures/src/stream.rs new file mode 100644 index 00000000..2ec505f1 --- /dev/null +++ b/futures/src/stream.rs @@ -0,0 +1,26 @@ +//! Create asynchronous streams of data. +use futures::channel::mpsc; +use futures::never::Never; +use futures::stream::{self, Stream, StreamExt}; + +use std::future::Future; + +/// Creates a new [`Stream`] that produces the items sent from a [`Future`] +/// to the [`mpsc::Sender`] provided to the closure. +/// +/// This is a more ergonomic [`stream::unfold`], which allows you to go +/// from the "world of futures" to the "world of streams" by simply looping +/// and publishing to an async channel from inside a [`Future`]. +pub fn channel<T, F>( + size: usize, + f: impl FnOnce(mpsc::Sender<T>) -> F, +) -> impl Stream<Item = T> +where + F: Future<Output = Never>, +{ + let (sender, receiver) = mpsc::channel(size); + + let runner = stream::once(f(sender)).map(|_| unreachable!()); + + stream::select(receiver, runner) +} diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index 5ec39582..1a0d454d 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -5,11 +5,9 @@ pub use tracker::Tracker; use crate::core::event; use crate::core::window; -use crate::futures::{Future, Stream}; +use crate::futures::Stream; use crate::{BoxStream, MaybeSend}; -use futures::channel::mpsc; -use futures::never::Never; use std::any::TypeId; use std::hash::Hash; @@ -61,20 +59,66 @@ pub type Hasher = rustc_hash::FxHasher; /// A request to listen to external events. /// -/// Besides performing async actions on demand with `Command`, most +/// Besides performing async actions on demand with `Task`, most /// applications also need to listen to external events passively. /// -/// A [`Subscription`] is normally provided to some runtime, like a `Command`, +/// A [`Subscription`] is normally provided to some runtime, like a `Task`, /// and it will generate events as long as the user keeps requesting it. /// /// For instance, you can use a [`Subscription`] to listen to a `WebSocket` /// connection, keyboard presses, mouse events, time ticks, etc. +/// +/// # The Lifetime of a [`Subscription`] +/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects +/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally +/// in the `subscription` function of an `application` or a `daemon`. +/// +/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will +/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying +/// [`Stream`] and executing it in an async runtime. +/// +/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way +/// to build a certain [`Stream`] together with some way to _identify_ it. +/// +/// Identification is important because when a specific [`Subscription`] stops being returned to the +/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a +/// [`Subscription`] to keep track of it. +/// +/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily +/// and whenever necessary. +/// +/// ``` +/// # mod iced { +/// # pub mod time { +/// # pub use iced_futures::backend::default::time::every; +/// # pub use std::time::{Duration, Instant}; +/// # } +/// # +/// # pub use iced_futures::Subscription; +/// # } +/// use iced::time::{self, Duration, Instant}; +/// use iced::Subscription; +/// +/// struct State { +/// timer_enabled: bool, +/// } +/// +/// fn subscription(state: &State) -> Subscription<Instant> { +/// if state.timer_enabled { +/// time::every(Duration::from_secs(1)) +/// } else { +/// Subscription::none() +/// } +/// } +/// ``` +/// +/// [`Future`]: std::future::Future #[must_use = "`Subscription` must be returned to runtime to take effect"] -pub struct Subscription<Message> { - recipes: Vec<Box<dyn Recipe<Output = Message>>>, +pub struct Subscription<T> { + recipes: Vec<Box<dyn Recipe<Output = T>>>, } -impl<Message> Subscription<Message> { +impl<T> Subscription<T> { /// Returns an empty [`Subscription`] that will not produce any output. pub fn none() -> Self { Self { @@ -82,19 +126,102 @@ impl<Message> Subscription<Message> { } } - /// Creates a [`Subscription`] from a [`Recipe`] describing it. - pub fn from_recipe( - recipe: impl Recipe<Output = Message> + 'static, - ) -> Self { - Self { - recipes: vec![Box::new(recipe)], - } + /// Returns a [`Subscription`] that will call the given function to create and + /// asynchronously run the given [`Stream`]. + /// + /// # Creating an asynchronous worker with bidirectional communication + /// You can leverage this helper to create a [`Subscription`] that spawns + /// an asynchronous worker in the background and establish a channel of + /// communication with an `iced` application. + /// + /// You can achieve this by creating an `mpsc` channel inside the closure + /// and returning the `Sender` as a `Message` for the `Application`: + /// + /// ``` + /// use iced_futures::subscription::{self, Subscription}; + /// use iced_futures::stream; + /// use iced_futures::futures::channel::mpsc; + /// use iced_futures::futures::sink::SinkExt; + /// use iced_futures::futures::Stream; + /// + /// pub enum Event { + /// Ready(mpsc::Sender<Input>), + /// WorkFinished, + /// // ... + /// } + /// + /// enum Input { + /// DoSomeWork, + /// // ... + /// } + /// + /// fn some_worker() -> impl Stream<Item = Event> { + /// stream::channel(100, |mut output| async move { + /// // Create channel + /// let (sender, mut receiver) = mpsc::channel(100); + /// + /// // Send the sender back to the application + /// output.send(Event::Ready(sender)).await; + /// + /// loop { + /// use iced_futures::futures::StreamExt; + /// + /// // Read next input sent from `Application` + /// let input = receiver.select_next_some().await; + /// + /// match input { + /// Input::DoSomeWork => { + /// // Do some async work... + /// + /// // Finally, we can optionally produce a message to tell the + /// // `Application` the work is done + /// output.send(Event::WorkFinished).await; + /// } + /// } + /// } + /// }) + /// } + /// + /// fn subscription() -> Subscription<Event> { + /// Subscription::run(some_worker) + /// } + /// ``` + /// + /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket` + /// connection open. + /// + /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.12/examples/websocket + pub fn run<S>(builder: fn() -> S) -> Self + where + S: Stream<Item = T> + MaybeSend + 'static, + T: 'static, + { + from_recipe(Runner { + id: builder, + spawn: move |_| builder(), + }) + } + + /// Returns a [`Subscription`] that will create and asynchronously run the + /// given [`Stream`]. + /// + /// The `id` will be used to uniquely identify the [`Subscription`]. + pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T> + where + I: Hash + 'static, + S: Stream<Item = T> + MaybeSend + 'static, + T: 'static, + { + from_recipe(Runner { + id, + spawn: move |_| stream, + }) } /// Batches all the provided subscriptions and returns the resulting /// [`Subscription`]. pub fn batch( - subscriptions: impl IntoIterator<Item = Subscription<Message>>, + subscriptions: impl IntoIterator<Item = Subscription<T>>, ) -> Self { Self { recipes: subscriptions @@ -104,18 +231,13 @@ impl<Message> Subscription<Message> { } } - /// Returns the different recipes of the [`Subscription`]. - pub fn into_recipes(self) -> Vec<Box<dyn Recipe<Output = Message>>> { - self.recipes - } - /// Adds a value to the [`Subscription`] context. /// /// The value will be part of the identity of a [`Subscription`]. - pub fn with<T>(mut self, value: T) -> Subscription<(T, Message)> + pub fn with<A>(mut self, value: A) -> Subscription<(A, T)> where - Message: 'static, - T: std::hash::Hash + Clone + Send + Sync + 'static, + T: 'static, + A: std::hash::Hash + Clone + Send + Sync + 'static, { Subscription { recipes: self @@ -123,7 +245,7 @@ impl<Message> Subscription<Message> { .drain(..) .map(|recipe| { Box::new(With::new(recipe, value.clone())) - as Box<dyn Recipe<Output = (T, Message)>> + as Box<dyn Recipe<Output = (A, T)>> }) .collect(), } @@ -136,8 +258,8 @@ impl<Message> Subscription<Message> { /// will panic in debug mode otherwise. pub fn map<F, A>(mut self, f: F) -> Subscription<A> where - Message: 'static, - F: Fn(Message) -> A + MaybeSend + Clone + 'static, + T: 'static, + F: Fn(T) -> A + MaybeSend + Clone + 'static, A: 'static, { debug_assert!( @@ -159,7 +281,23 @@ impl<Message> Subscription<Message> { } } -impl<Message> std::fmt::Debug for Subscription<Message> { +/// Creates a [`Subscription`] from a [`Recipe`] describing it. +pub fn from_recipe<T>( + recipe: impl Recipe<Output = T> + 'static, +) -> Subscription<T> { + Subscription { + recipes: vec![Box::new(recipe)], + } +} + +/// Returns the different recipes of the [`Subscription`]. +pub fn into_recipes<T>( + subscription: Subscription<T>, +) -> Vec<Box<dyn Recipe<Output = T>>> { + subscription.recipes +} + +impl<T> std::fmt::Debug for Subscription<T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Subscription").finish() } @@ -273,65 +411,13 @@ where } } -/// Returns a [`Subscription`] that will call the given function to create and -/// asynchronously run the given [`Stream`]. -pub fn run<S, Message>(builder: fn() -> S) -> Subscription<Message> -where - S: Stream<Item = Message> + MaybeSend + 'static, - Message: 'static, -{ - Subscription::from_recipe(Runner { - id: builder, - spawn: move |_| builder(), - }) -} - -/// Returns a [`Subscription`] that will create and asynchronously run the -/// given [`Stream`]. -/// -/// The `id` will be used to uniquely identify the [`Subscription`]. -pub fn run_with_id<I, S, Message>(id: I, stream: S) -> Subscription<Message> -where - I: Hash + 'static, - S: Stream<Item = Message> + MaybeSend + 'static, - Message: 'static, -{ - Subscription::from_recipe(Runner { - id, - spawn: move |_| stream, - }) -} - -/// Returns a [`Subscription`] that will create and asynchronously run a -/// [`Stream`] that will call the provided closure to produce every `Message`. -/// -/// The `id` will be used to uniquely identify the [`Subscription`]. -pub fn unfold<I, T, Fut, Message>( - id: I, - initial: T, - mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, -) -> Subscription<Message> -where - I: Hash + 'static, - T: MaybeSend + 'static, - Fut: Future<Output = (Message, T)> + MaybeSend + 'static, - Message: 'static + MaybeSend, -{ - use futures::future::FutureExt; - - run_with_id( - id, - futures::stream::unfold(initial, move |state| f(state).map(Some)), - ) -} - -pub(crate) fn filter_map<I, F, Message>(id: I, f: F) -> Subscription<Message> +pub(crate) fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T> where I: Hash + 'static, - F: Fn(Event) -> Option<Message> + MaybeSend + 'static, - Message: 'static + MaybeSend, + F: Fn(Event) -> Option<T> + MaybeSend + 'static, + T: 'static + MaybeSend, { - Subscription::from_recipe(Runner { + from_recipe(Runner { id, spawn: |events| { use futures::future; @@ -342,107 +428,22 @@ where }) } -/// Creates a [`Subscription`] that publishes the events sent from a [`Future`] -/// to an [`mpsc::Sender`] with the given bounds. -/// -/// # Creating an asynchronous worker with bidirectional communication -/// You can leverage this helper to create a [`Subscription`] that spawns -/// an asynchronous worker in the background and establish a channel of -/// communication with an `iced` application. -/// -/// You can achieve this by creating an `mpsc` channel inside the closure -/// and returning the `Sender` as a `Message` for the `Application`: -/// -/// ``` -/// use iced_futures::subscription::{self, Subscription}; -/// use iced_futures::futures::channel::mpsc; -/// use iced_futures::futures::sink::SinkExt; -/// -/// pub enum Event { -/// Ready(mpsc::Sender<Input>), -/// WorkFinished, -/// // ... -/// } -/// -/// enum Input { -/// DoSomeWork, -/// // ... -/// } -/// -/// fn some_worker() -> Subscription<Event> { -/// struct SomeWorker; -/// -/// subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move { -/// // Create channel -/// let (sender, mut receiver) = mpsc::channel(100); -/// -/// // Send the sender back to the application -/// output.send(Event::Ready(sender)).await; -/// -/// loop { -/// use iced_futures::futures::StreamExt; -/// -/// // Read next input sent from `Application` -/// let input = receiver.select_next_some().await; -/// -/// match input { -/// Input::DoSomeWork => { -/// // Do some async work... -/// -/// // Finally, we can optionally produce a message to tell the -/// // `Application` the work is done -/// output.send(Event::WorkFinished).await; -/// } -/// } -/// } -/// }) -/// } -/// ``` -/// -/// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket` -/// connection open. -/// -/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.12/examples/websocket -pub fn channel<I, Fut, Message>( - id: I, - size: usize, - f: impl FnOnce(mpsc::Sender<Message>) -> Fut + MaybeSend + 'static, -) -> Subscription<Message> -where - I: Hash + 'static, - Fut: Future<Output = Never> + MaybeSend + 'static, - Message: 'static + MaybeSend, -{ - use futures::stream::{self, StreamExt}; - - Subscription::from_recipe(Runner { - id, - spawn: move |_| { - let (sender, receiver) = mpsc::channel(size); - - let runner = stream::once(f(sender)).map(|_| unreachable!()); - - stream::select(receiver, runner) - }, - }) -} - -struct Runner<I, F, S, Message> +struct Runner<I, F, S, T> where F: FnOnce(EventStream) -> S, - S: Stream<Item = Message>, + S: Stream<Item = T>, { id: I, spawn: F, } -impl<I, S, F, Message> Recipe for Runner<I, F, S, Message> +impl<I, F, S, T> Recipe for Runner<I, F, S, T> where I: Hash + 'static, F: FnOnce(EventStream) -> S, - S: Stream<Item = Message> + MaybeSend + 'static, + S: Stream<Item = T> + MaybeSend + 'static, { - type Output = Message; + type Output = T; fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::<I>().hash(state); @@ -175,6 +175,7 @@ use iced_winit::core; use iced_winit::runtime; pub use iced_futures::futures; +pub use iced_futures::stream; #[cfg(feature = "highlighter")] pub use iced_highlighter as highlighter; @@ -202,6 +203,7 @@ pub use crate::core::{ Theme, Transformation, Vector, }; pub use crate::runtime::{exit, Task}; +pub use iced_futures::Subscription; pub mod clipboard { //! Access the clipboard. @@ -255,13 +257,6 @@ pub mod mouse { }; } -pub mod subscription { - //! Listen to external events in your application. - pub use iced_futures::subscription::{ - channel, run, run_with_id, unfold, Subscription, - }; -} - #[cfg(feature = "system")] pub mod system { //! Retrieve system information. @@ -314,7 +309,6 @@ pub use executor::Executor; pub use font::Font; pub use renderer::Renderer; pub use settings::Settings; -pub use subscription::Subscription; #[doc(inline)] pub use application::application; diff --git a/winit/src/program.rs b/winit/src/program.rs index e1693196..3a4e2e48 100644 --- a/winit/src/program.rs +++ b/winit/src/program.rs @@ -207,7 +207,9 @@ where runtime.run(stream); } - runtime.track(program.subscription().map(Action::Output).into_recipes()); + runtime.track(subscription::into_recipes( + program.subscription().map(Action::Output), + )); let (boot_sender, boot_receiver) = oneshot::channel(); let (event_sender, event_receiver) = mpsc::unbounded(); @@ -1120,7 +1122,7 @@ fn update<P: Program, E: Executor>( } let subscription = program.subscription(); - runtime.track(subscription.map(Action::Output).into_recipes()); + runtime.track(subscription::into_recipes(subscription.map(Action::Output))); } fn run_action<P, C>( |