diff options
author | 2023-04-17 23:41:12 +0200 | |
---|---|---|
committer | 2023-04-17 23:41:12 +0200 | |
commit | 4bae457c37b499f3cfddbdac9ff37a34cbce61d5 (patch) | |
tree | 79af93b2f7fabca1687900b48b165c5c74dcd26f /futures | |
parent | c0431aedd3bbef4161456f2fa5f29866e8f17fc5 (diff) | |
parent | 4b05f42fd6d18bf572b772dd60d6a4309ea5f343 (diff) | |
download | iced-4bae457c37b499f3cfddbdac9ff37a34cbce61d5.tar.gz iced-4bae457c37b499f3cfddbdac9ff37a34cbce61d5.tar.bz2 iced-4bae457c37b499f3cfddbdac9ff37a34cbce61d5.zip |
Merge branch 'master' into advanced-text
Diffstat (limited to 'futures')
-rw-r--r-- | futures/Cargo.toml | 2 | ||||
-rw-r--r-- | futures/src/subscription.rs | 103 |
2 files changed, 69 insertions, 36 deletions
diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 411e7c2a..f636a304 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -17,7 +17,7 @@ thread-pool = ["futures/thread-pool"] log = "0.4" [dependencies.iced_core] -version = "0.8" +version = "0.9" path = "../core" [dependencies.futures] diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index 876f29c2..801c2694 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -9,6 +9,8 @@ use crate::core::Hasher; use crate::futures::{Future, Stream}; use crate::{BoxStream, MaybeSend}; +use futures::channel::mpsc; +use futures::never::Never; use std::hash::Hash; /// A stream of runtime events. @@ -126,9 +128,9 @@ impl<Message> std::fmt::Debug for Subscription<Message> { /// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how /// to listen to time. /// -/// [examples]: https://github.com/iced-rs/iced/tree/0.8/examples -/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.8/examples/download_progress -/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.8/examples/stopwatch +/// [examples]: https://github.com/iced-rs/iced/tree/0.9/examples +/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.9/examples/download_progress +/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.9/examples/stopwatch pub trait Recipe { /// The events that will be produced by a [`Subscription`] with this /// [`Recipe`]. @@ -317,6 +319,27 @@ where /// [`Stream`] that will call the provided closure to produce every `Message`. /// /// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn unfold<I, T, Fut, Message>( + id: I, + initial: T, + mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, +) -> Subscription<Message> +where + I: Hash + 'static, + T: MaybeSend + 'static, + Fut: Future<Output = (Message, T)> + MaybeSend + 'static, + Message: 'static + MaybeSend, +{ + use futures::future::FutureExt; + + run_with_id( + id, + futures::stream::unfold(initial, move |state| f(state).map(Some)), + ) +} + +/// Creates a [`Subscription`] that publishes the events sent from a [`Future`] +/// to an [`mpsc::Sender`] with the given bounds. /// /// # Creating an asynchronous worker with bidirectional communication /// You can leverage this helper to create a [`Subscription`] that spawns @@ -328,9 +351,8 @@ where /// /// ``` /// use iced_futures::subscription::{self, Subscription}; -/// use iced_futures::futures; -/// -/// use futures::channel::mpsc; +/// use iced_futures::futures::channel::mpsc; +/// use iced_futures::futures::sink::SinkExt; /// /// pub enum Event { /// Ready(mpsc::Sender<Input>), @@ -351,27 +373,35 @@ where /// fn some_worker() -> Subscription<Event> { /// struct SomeWorker; /// -/// subscription::unfold(std::any::TypeId::of::<SomeWorker>(), State::Starting, |state| async move { -/// match state { -/// State::Starting => { -/// // Create channel -/// let (sender, receiver) = mpsc::channel(100); +/// subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move { +/// let mut state = State::Starting; /// -/// (Some(Event::Ready(sender)), State::Ready(receiver)) -/// } -/// State::Ready(mut receiver) => { -/// use futures::StreamExt; +/// loop { +/// match &mut state { +/// State::Starting => { +/// // Create channel +/// let (sender, receiver) = mpsc::channel(100); +/// +/// // Send the sender back to the application +/// output.send(Event::Ready(sender)).await; +/// +/// // We are ready to receive messages +/// state = State::Ready(receiver); +/// } +/// State::Ready(receiver) => { +/// use iced_futures::futures::StreamExt; /// -/// // Read next input sent from `Application` -/// let input = receiver.select_next_some().await; +/// // Read next input sent from `Application` +/// let input = receiver.select_next_some().await; /// -/// match input { -/// Input::DoSomeWork => { -/// // Do some async work... +/// match input { +/// Input::DoSomeWork => { +/// // Do some async work... /// -/// // Finally, we can optionally return a message to tell the -/// // `Application` the work is done -/// (Some(Event::WorkFinished), State::Ready(receiver)) +/// // Finally, we can optionally produce a message to tell the +/// // `Application` the work is done +/// output.send(Event::WorkFinished).await; +/// } /// } /// } /// } @@ -383,26 +413,29 @@ where /// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket /// connection open. /// -/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.8/examples/websocket -pub fn unfold<I, T, Fut, Message>( +/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.9/examples/websocket +pub fn channel<I, Fut, Message>( id: I, - initial: T, - mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, + size: usize, + f: impl Fn(mpsc::Sender<Message>) -> Fut + MaybeSend + Sync + 'static, ) -> Subscription<Message> where I: Hash + 'static, - T: MaybeSend + 'static, - Fut: Future<Output = (Option<Message>, T)> + MaybeSend + 'static, + Fut: Future<Output = Never> + MaybeSend + 'static, Message: 'static + MaybeSend, { - use futures::future::{self, FutureExt}; - use futures::stream::StreamExt; + use futures::stream::{self, StreamExt}; - run_with_id( + Subscription::from_recipe(Runner { id, - futures::stream::unfold(initial, move |state| f(state).map(Some)) - .filter_map(future::ready), - ) + spawn: move |_| { + let (sender, receiver) = mpsc::channel(size); + + let runner = stream::once(f(sender)).map(|_| unreachable!()); + + stream::select(receiver, runner) + }, + }) } struct Runner<I, F, S, Message> |