From 810b445f8d2f429e9ad07625f9b67dba09783d7a Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:43:06 +0700 Subject: Rewrite `events` and `events_with` with a new `Runner` abstraction --- native/src/subscription.rs | 72 +++++++++++++++++++++++++++++++++------ native/src/subscription/events.rs | 35 ------------------- 2 files changed, 62 insertions(+), 45 deletions(-) delete mode 100644 native/src/subscription/events.rs (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 2950879c..7a1c5852 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -1,8 +1,12 @@ //! Listen to external events in your application. use crate::event::{self, Event}; use crate::Hasher; + +use iced_futures::futures::{self, Stream}; use iced_futures::BoxStream; +use std::hash::Hash; + /// A request to listen to external events. /// /// Besides performing async actions on demand with [`Command`], most @@ -29,20 +33,14 @@ pub type Tracker = pub use iced_futures::subscription::Recipe; -mod events; - -use events::Events; - /// Returns a [`Subscription`] to all the runtime events. /// /// This subscription will notify your application of any [`Event`] that was /// not captured by any widget. pub fn events() -> Subscription { - Subscription::from_recipe(Events { - f: |event, status| match status { - event::Status::Ignored => Some(event), - event::Status::Captured => None, - }, + events_with(|event, status| match status { + event::Status::Ignored => Some(event), + event::Status::Captured => None, }) } @@ -60,5 +58,59 @@ pub fn events_with( where Message: 'static + Send, { - Subscription::from_recipe(Events { f }) + #[derive(Debug, Clone, Copy, Hash)] + struct Events(u64); + + let hash = { + use std::hash::Hasher as _; + + let mut hasher = Hasher::default(); + + f.hash(&mut hasher); + + hasher.finish() + }; + + Subscription::from_recipe(Runner { + initial: Events(hash), + spawn: move |_, events| { + use futures::future; + use futures::stream::StreamExt; + + events.filter_map(move |(event, status)| { + future::ready(f(event, status)) + }) + }, + }) +} + +struct Runner +where + F: FnOnce(T, EventStream) -> S, + S: Stream, +{ + initial: T, + spawn: F, +} + +impl Recipe + for Runner +where + T: Clone + Hash + 'static, + F: FnOnce(T, EventStream) -> S, + S: Stream + Send + 'static, +{ + type Output = Message; + + fn hash(&self, state: &mut Hasher) { + std::any::TypeId::of::().hash(state); + + self.initial.hash(state); + } + + fn stream(self: Box, input: EventStream) -> BoxStream { + use futures::stream::StreamExt; + + (self.spawn)(self.initial, input).boxed() + } } diff --git a/native/src/subscription/events.rs b/native/src/subscription/events.rs deleted file mode 100644 index ca143bb3..00000000 --- a/native/src/subscription/events.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::event::{self, Event}; -use crate::subscription::{EventStream, Recipe}; -use crate::Hasher; -use iced_futures::futures::future; -use iced_futures::futures::StreamExt; -use iced_futures::BoxStream; - -pub struct Events { - pub(super) f: fn(Event, event::Status) -> Option, -} - -impl Recipe for Events -where - Message: 'static + Send, -{ - type Output = Message; - - fn hash(&self, state: &mut Hasher) { - use std::hash::Hash; - - struct Marker; - std::any::TypeId::of::().hash(state); - self.f.hash(state); - } - - fn stream( - self: Box, - event_stream: EventStream, - ) -> BoxStream { - let stream = event_stream.filter_map(move |(event, status)| { - future::ready((self.f)(event, status)) - }); - iced_futures::boxed_stream(stream) - } -} -- cgit From 7442d0b66f1c7b4c912ad5ac358dafcc8b07824e Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:43:54 +0700 Subject: Implement `subscription::run` :tada: --- native/src/subscription.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 7a1c5852..420797d1 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -84,6 +84,25 @@ where }) } +/// Returns a [`Subscription`] that will create and asynchronously run the +/// [`Stream`] returned by the provided closure. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn run( + initial: T, + f: impl FnOnce(T) -> S + 'static, +) -> Subscription +where + Message: 'static, + T: Clone + Hash + 'static, + S: Stream + Send + 'static, +{ + Subscription::from_recipe(Runner { + initial, + spawn: move |initial, _| f(initial), + }) +} + struct Runner where F: FnOnce(T, EventStream) -> S, -- cgit From 2a3271dc106b702a6b81888506578ec5f845281b Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:55:27 +0700 Subject: Implement `subscription::unfold` :tada: --- native/src/subscription.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 420797d1..9ff89ccf 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -2,7 +2,8 @@ use crate::event::{self, Event}; use crate::Hasher; -use iced_futures::futures::{self, Stream}; +use iced_futures::futures::channel::mpsc; +use iced_futures::futures::{self, Future, Stream}; use iced_futures::BoxStream; use std::hash::Hash; @@ -103,6 +104,26 @@ where }) } +/// Returns a [`Subscription`] that will create and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn unfold( + initial: T, + mut f: impl FnMut(T) -> Fut + Send + Sync + 'static, +) -> Subscription +where + Message: 'static, + T: Clone + Hash + Send + 'static, + Fut: Future + Send + 'static, +{ + use futures::future::FutureExt; + + run(initial, move |initial| { + futures::stream::unfold(initial, move |state| f(state).map(Some)) + }) +} + struct Runner where F: FnOnce(T, EventStream) -> S, -- cgit From 35e4f307595cbb67687afcbc8d96ad97109210b5 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:55:42 +0700 Subject: Implement `subscription::worker` :tada: --- native/src/subscription.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 9ff89ccf..85ad96fa 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -124,6 +124,50 @@ where }) } +/// Returns a [`Subscription`] that will open a channel and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// When the [`Subscription`] starts, an `on_ready` message will be produced +/// containing the [`mpsc::Sender`] end of the channel, which can be used by +/// the parent application to send `Input` to the running [`Subscription`]. +/// +/// The provided closure should use the [`mpsc::Receiver`] argument to await for +/// any `Input`. +/// +/// This function is really useful to create asynchronous workers with +/// bidirectional communication with a parent application. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn worker( + initial: T, + on_ready: impl FnOnce(mpsc::Sender) -> Message + 'static, + f: impl FnMut(T, &mut mpsc::Receiver) -> Fut + Send + Sync + 'static, +) -> Subscription +where + T: Clone + Hash + Send + 'static, + Fut: Future + Send + 'static, + Message: Send + 'static, + Input: Send + 'static, +{ + use futures::future; + use futures::stream::StreamExt; + + run(initial, move |initial| { + let (sender, receiver) = mpsc::channel(100); + + futures::stream::once(future::ready(on_ready(sender))).chain( + futures::stream::unfold( + (f, initial, receiver), + move |(mut f, state, mut receiver)| async { + let (message, state) = f(state, &mut receiver).await; + + Some((message, (f, state, receiver))) + }, + ), + ) + }) +} + struct Runner where F: FnOnce(T, EventStream) -> S, -- cgit From dc50a2830ab553dfc5dfc28d4fd1af6b3981c656 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sun, 16 Jan 2022 15:50:19 +0700 Subject: Draft `websocket` example :tada: --- native/src/subscription.rs | 98 ++++++++++++---------------------------------- 1 file changed, 25 insertions(+), 73 deletions(-) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 85ad96fa..70cd269e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -2,7 +2,6 @@ use crate::event::{self, Event}; use crate::Hasher; -use iced_futures::futures::channel::mpsc; use iced_futures::futures::{self, Future, Stream}; use iced_futures::BoxStream; @@ -59,21 +58,9 @@ pub fn events_with( where Message: 'static + Send, { - #[derive(Debug, Clone, Copy, Hash)] - struct Events(u64); - - let hash = { - use std::hash::Hasher as _; - - let mut hasher = Hasher::default(); - - f.hash(&mut hasher); - - hasher.finish() - }; - Subscription::from_recipe(Runner { - initial: Events(hash), + id: f, + initial: (), spawn: move |_, events| { use futures::future; use futures::stream::StreamExt; @@ -89,16 +76,19 @@ where /// [`Stream`] returned by the provided closure. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run( +pub fn run( + id: I, initial: T, f: impl FnOnce(T) -> S + 'static, ) -> Subscription where - Message: 'static, - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, S: Stream + Send + 'static, + Message: 'static, { Subscription::from_recipe(Runner { + id, initial, spawn: move |initial, _| f(initial), }) @@ -108,79 +98,41 @@ where /// [`Stream`] that will call the provided closure to produce every `Message`. /// /// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn unfold( +pub fn unfold( + id: I, initial: T, mut f: impl FnMut(T) -> Fut + Send + Sync + 'static, ) -> Subscription where - Message: 'static, - T: Clone + Hash + Send + 'static, - Fut: Future + Send + 'static, -{ - use futures::future::FutureExt; - - run(initial, move |initial| { - futures::stream::unfold(initial, move |state| f(state).map(Some)) - }) -} - -/// Returns a [`Subscription`] that will open a channel and asynchronously run a -/// [`Stream`] that will call the provided closure to produce every `Message`. -/// -/// When the [`Subscription`] starts, an `on_ready` message will be produced -/// containing the [`mpsc::Sender`] end of the channel, which can be used by -/// the parent application to send `Input` to the running [`Subscription`]. -/// -/// The provided closure should use the [`mpsc::Receiver`] argument to await for -/// any `Input`. -/// -/// This function is really useful to create asynchronous workers with -/// bidirectional communication with a parent application. -/// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn worker( - initial: T, - on_ready: impl FnOnce(mpsc::Sender) -> Message + 'static, - f: impl FnMut(T, &mut mpsc::Receiver) -> Fut + Send + Sync + 'static, -) -> Subscription -where - T: Clone + Hash + Send + 'static, - Fut: Future + Send + 'static, - Message: Send + 'static, - Input: Send + 'static, + I: Hash + 'static, + T: Send + 'static, + Fut: Future, T)> + Send + 'static, + Message: 'static + Send, { - use futures::future; + use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(initial, move |initial| { - let (sender, receiver) = mpsc::channel(100); - - futures::stream::once(future::ready(on_ready(sender))).chain( - futures::stream::unfold( - (f, initial, receiver), - move |(mut f, state, mut receiver)| async { - let (message, state) = f(state, &mut receiver).await; - - Some((message, (f, state, receiver))) - }, - ), - ) + run(id, initial, move |initial| { + futures::stream::unfold(initial, move |state| f(state).map(Some)) + .filter_map(future::ready) }) } -struct Runner +struct Runner where F: FnOnce(T, EventStream) -> S, S: Stream, { + id: I, initial: T, spawn: F, } -impl Recipe - for Runner +impl Recipe + for Runner where - T: Clone + Hash + 'static, + I: Hash + 'static, + T: 'static, F: FnOnce(T, EventStream) -> S, S: Stream + Send + 'static, { @@ -189,7 +141,7 @@ where fn hash(&self, state: &mut Hasher) { std::any::TypeId::of::().hash(state); - self.initial.hash(state); + self.id.hash(state); } fn stream(self: Box, input: EventStream) -> BoxStream { -- cgit From ddbbe7353bce0827160cb8d539a3114c159b3745 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 17 Jan 2022 15:29:41 +0700 Subject: Simplify `run` and `unfold` helpers to build a `Subscription` --- native/src/subscription.rs | 43 +++++++++++++++++-------------------------- 1 file changed, 17 insertions(+), 26 deletions(-) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 70cd269e..ece6556e 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -60,8 +60,7 @@ where { Subscription::from_recipe(Runner { id: f, - initial: (), - spawn: move |_, events| { + spawn: move |events| { use futures::future; use futures::stream::StreamExt; @@ -73,31 +72,25 @@ where } /// Returns a [`Subscription`] that will create and asynchronously run the -/// [`Stream`] returned by the provided closure. +/// given [`Stream`]. /// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. -pub fn run( - id: I, - initial: T, - f: impl FnOnce(T) -> S + 'static, -) -> Subscription +/// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn run(id: I, stream: S) -> Subscription where I: Hash + 'static, - T: 'static, S: Stream + Send + 'static, Message: 'static, { Subscription::from_recipe(Runner { id, - initial, - spawn: move |initial, _| f(initial), + spawn: move |_| stream, }) } /// Returns a [`Subscription`] that will create and asynchronously run a /// [`Stream`] that will call the provided closure to produce every `Message`. /// -/// The `initial` state will be used to uniquely identify the [`Subscription`]. +/// The `id` will be used to uniquely identify the [`Subscription`]. pub fn unfold( id: I, initial: T, @@ -112,41 +105,39 @@ where use futures::future::{self, FutureExt}; use futures::stream::StreamExt; - run(id, initial, move |initial| { + run( + id, futures::stream::unfold(initial, move |state| f(state).map(Some)) - .filter_map(future::ready) - }) + .filter_map(future::ready), + ) } -struct Runner +struct Runner where - F: FnOnce(T, EventStream) -> S, + F: FnOnce(EventStream) -> S, S: Stream, { id: I, - initial: T, spawn: F, } -impl Recipe - for Runner +impl Recipe + for Runner where I: Hash + 'static, - T: 'static, - F: FnOnce(T, EventStream) -> S, + F: FnOnce(EventStream) -> S, S: Stream + Send + 'static, { type Output = Message; fn hash(&self, state: &mut Hasher) { - std::any::TypeId::of::().hash(state); - + std::any::TypeId::of::().hash(state); self.id.hash(state); } fn stream(self: Box, input: EventStream) -> BoxStream { use futures::stream::StreamExt; - (self.spawn)(self.initial, input).boxed() + (self.spawn)(input).boxed() } } -- cgit From 5ce8653fb51c035e0a6fe1ba7ab363018cdf107b Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Mon, 17 Jan 2022 15:48:37 +0700 Subject: Add worker example to docs of `subscription::unfold` --- native/src/subscription.rs | 65 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index ece6556e..63834654 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -91,6 +91,71 @@ where /// [`Stream`] that will call the provided closure to produce every `Message`. /// /// The `id` will be used to uniquely identify the [`Subscription`]. +/// +/// # Creating an asynchronous worker with bidirectional communication +/// You can leverage this helper to create a [`Subscription`] that spawns +/// an asynchronous worker in the background and establish a channel of +/// communication with an `iced` application. +/// +/// You can achieve this by creating an `mpsc` channel inside the closure +/// and returning the `Sender` as a `Message` for the `Application`: +/// +/// ``` +/// use iced_native::subscription::{self, Subscription}; +/// use iced_native::futures::channel::mpsc; +/// +/// pub enum Event { +/// Ready(mpsc::Sender), +/// WorkFinished, +/// // ... +/// } +/// +/// enum Input { +/// DoSomeWork, +/// // ... +/// } +/// +/// enum State { +/// Starting, +/// Ready(mpsc::Receiver), +/// } +/// +/// fn some_worker() -> Subscription { +/// struct SomeWorker; +/// +/// subscription::unfold(std::any::TypeId::of::(), State::Starting, |state| async move { +/// match state { +/// State::Starting => { +/// // Create channel +/// let (sender, receiver) = mpsc::channel(100); +/// +/// (Some(Event::Ready(sender)), State::Ready(receiver)) +/// } +/// State::Ready(mut receiver) => { +/// use iced_native::futures::StreamExt; +/// +/// // Read next input sent from `Application` +/// let input = receiver.select_next_some().await; +/// +/// match input { +/// Input::DoSomeWork => { +/// // Do some async work... +/// +/// // Finally, we can optionally return a message to tell the +/// // `Application` the work is done +/// (Some(Event::WorkFinished), State::Ready(receiver)) +/// } +/// } +/// } +/// } +/// }) +/// } +/// ``` +/// +/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket +/// connection open. +/// +/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.4/examples/websocket pub fn unfold( id: I, initial: T, -- cgit