summaryrefslogtreecommitdiffstats
path: root/futures
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2023-04-17 23:41:12 +0200
committerLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2023-04-17 23:41:12 +0200
commit4bae457c37b499f3cfddbdac9ff37a34cbce61d5 (patch)
tree79af93b2f7fabca1687900b48b165c5c74dcd26f /futures
parentc0431aedd3bbef4161456f2fa5f29866e8f17fc5 (diff)
parent4b05f42fd6d18bf572b772dd60d6a4309ea5f343 (diff)
downloadiced-4bae457c37b499f3cfddbdac9ff37a34cbce61d5.tar.gz
iced-4bae457c37b499f3cfddbdac9ff37a34cbce61d5.tar.bz2
iced-4bae457c37b499f3cfddbdac9ff37a34cbce61d5.zip
Merge branch 'master' into advanced-text
Diffstat (limited to 'futures')
-rw-r--r--futures/Cargo.toml2
-rw-r--r--futures/src/subscription.rs103
2 files changed, 69 insertions, 36 deletions
diff --git a/futures/Cargo.toml b/futures/Cargo.toml
index 411e7c2a..f636a304 100644
--- a/futures/Cargo.toml
+++ b/futures/Cargo.toml
@@ -17,7 +17,7 @@ thread-pool = ["futures/thread-pool"]
log = "0.4"
[dependencies.iced_core]
-version = "0.8"
+version = "0.9"
path = "../core"
[dependencies.futures]
diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
index 876f29c2..801c2694 100644
--- a/futures/src/subscription.rs
+++ b/futures/src/subscription.rs
@@ -9,6 +9,8 @@ use crate::core::Hasher;
use crate::futures::{Future, Stream};
use crate::{BoxStream, MaybeSend};
+use futures::channel::mpsc;
+use futures::never::Never;
use std::hash::Hash;
/// A stream of runtime events.
@@ -126,9 +128,9 @@ impl<Message> std::fmt::Debug for Subscription<Message> {
/// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how
/// to listen to time.
///
-/// [examples]: https://github.com/iced-rs/iced/tree/0.8/examples
-/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.8/examples/download_progress
-/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.8/examples/stopwatch
+/// [examples]: https://github.com/iced-rs/iced/tree/0.9/examples
+/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.9/examples/download_progress
+/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.9/examples/stopwatch
pub trait Recipe {
/// The events that will be produced by a [`Subscription`] with this
/// [`Recipe`].
@@ -317,6 +319,27 @@ where
/// [`Stream`] that will call the provided closure to produce every `Message`.
///
/// The `id` will be used to uniquely identify the [`Subscription`].
+pub fn unfold<I, T, Fut, Message>(
+ id: I,
+ initial: T,
+ mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static,
+) -> Subscription<Message>
+where
+ I: Hash + 'static,
+ T: MaybeSend + 'static,
+ Fut: Future<Output = (Message, T)> + MaybeSend + 'static,
+ Message: 'static + MaybeSend,
+{
+ use futures::future::FutureExt;
+
+ run_with_id(
+ id,
+ futures::stream::unfold(initial, move |state| f(state).map(Some)),
+ )
+}
+
+/// Creates a [`Subscription`] that publishes the events sent from a [`Future`]
+/// to an [`mpsc::Sender`] with the given bounds.
///
/// # Creating an asynchronous worker with bidirectional communication
/// You can leverage this helper to create a [`Subscription`] that spawns
@@ -328,9 +351,8 @@ where
///
/// ```
/// use iced_futures::subscription::{self, Subscription};
-/// use iced_futures::futures;
-///
-/// use futures::channel::mpsc;
+/// use iced_futures::futures::channel::mpsc;
+/// use iced_futures::futures::sink::SinkExt;
///
/// pub enum Event {
/// Ready(mpsc::Sender<Input>),
@@ -351,27 +373,35 @@ where
/// fn some_worker() -> Subscription<Event> {
/// struct SomeWorker;
///
-/// subscription::unfold(std::any::TypeId::of::<SomeWorker>(), State::Starting, |state| async move {
-/// match state {
-/// State::Starting => {
-/// // Create channel
-/// let (sender, receiver) = mpsc::channel(100);
+/// subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move {
+/// let mut state = State::Starting;
///
-/// (Some(Event::Ready(sender)), State::Ready(receiver))
-/// }
-/// State::Ready(mut receiver) => {
-/// use futures::StreamExt;
+/// loop {
+/// match &mut state {
+/// State::Starting => {
+/// // Create channel
+/// let (sender, receiver) = mpsc::channel(100);
+///
+/// // Send the sender back to the application
+/// output.send(Event::Ready(sender)).await;
+///
+/// // We are ready to receive messages
+/// state = State::Ready(receiver);
+/// }
+/// State::Ready(receiver) => {
+/// use iced_futures::futures::StreamExt;
///
-/// // Read next input sent from `Application`
-/// let input = receiver.select_next_some().await;
+/// // Read next input sent from `Application`
+/// let input = receiver.select_next_some().await;
///
-/// match input {
-/// Input::DoSomeWork => {
-/// // Do some async work...
+/// match input {
+/// Input::DoSomeWork => {
+/// // Do some async work...
///
-/// // Finally, we can optionally return a message to tell the
-/// // `Application` the work is done
-/// (Some(Event::WorkFinished), State::Ready(receiver))
+/// // Finally, we can optionally produce a message to tell the
+/// // `Application` the work is done
+/// output.send(Event::WorkFinished).await;
+/// }
/// }
/// }
/// }
@@ -383,26 +413,29 @@ where
/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket
/// connection open.
///
-/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.8/examples/websocket
-pub fn unfold<I, T, Fut, Message>(
+/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.9/examples/websocket
+pub fn channel<I, Fut, Message>(
id: I,
- initial: T,
- mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static,
+ size: usize,
+ f: impl Fn(mpsc::Sender<Message>) -> Fut + MaybeSend + Sync + 'static,
) -> Subscription<Message>
where
I: Hash + 'static,
- T: MaybeSend + 'static,
- Fut: Future<Output = (Option<Message>, T)> + MaybeSend + 'static,
+ Fut: Future<Output = Never> + MaybeSend + 'static,
Message: 'static + MaybeSend,
{
- use futures::future::{self, FutureExt};
- use futures::stream::StreamExt;
+ use futures::stream::{self, StreamExt};
- run_with_id(
+ Subscription::from_recipe(Runner {
id,
- futures::stream::unfold(initial, move |state| f(state).map(Some))
- .filter_map(future::ready),
- )
+ spawn: move |_| {
+ let (sender, receiver) = mpsc::channel(size);
+
+ let runner = stream::once(f(sender)).map(|_| unreachable!());
+
+ stream::select(receiver, runner)
+ },
+ })
}
struct Runner<I, F, S, Message>