summaryrefslogtreecommitdiffstats
path: root/native/src
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón <hector0193@gmail.com>2023-04-11 20:24:38 +0200
committerLibravatar GitHub <noreply@github.com>2023-04-11 20:24:38 +0200
commitca828f03f5aab9efacc9d63d4149363333035a0c (patch)
tree7503390b65a2956bd82e3c4058cac1cf794a844f /native/src
parentff24f9040c822fa7706087091e92ebee1e5211c5 (diff)
parent0ed54346b0d6271cf5874debef1de57f8322a249 (diff)
downloadiced-ca828f03f5aab9efacc9d63d4149363333035a0c.tar.gz
iced-ca828f03f5aab9efacc9d63d4149363333035a0c.tar.bz2
iced-ca828f03f5aab9efacc9d63d4149363333035a0c.zip
Merge pull request #1786 from iced-rs/feature/subscription-channel
`channel` helper for `subscription`
Diffstat (limited to 'native/src')
-rw-r--r--native/src/subscription.rs91
1 files changed, 63 insertions, 28 deletions
diff --git a/native/src/subscription.rs b/native/src/subscription.rs
index 16e78e82..0ff5e320 100644
--- a/native/src/subscription.rs
+++ b/native/src/subscription.rs
@@ -3,6 +3,8 @@ use crate::event::{self, Event};
use crate::window;
use crate::Hasher;
+use iced_futures::futures::channel::mpsc;
+use iced_futures::futures::never::Never;
use iced_futures::futures::{self, Future, Stream};
use iced_futures::{BoxStream, MaybeSend};
@@ -133,6 +135,27 @@ where
/// [`Stream`] that will call the provided closure to produce every `Message`.
///
/// The `id` will be used to uniquely identify the [`Subscription`].
+pub fn unfold<I, T, Fut, Message>(
+ id: I,
+ initial: T,
+ mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static,
+) -> Subscription<Message>
+where
+ I: Hash + 'static,
+ T: MaybeSend + 'static,
+ Fut: Future<Output = (Message, T)> + MaybeSend + 'static,
+ Message: 'static + MaybeSend,
+{
+ use futures::future::FutureExt;
+
+ run_with_id(
+ id,
+ futures::stream::unfold(initial, move |state| f(state).map(Some)),
+ )
+}
+
+/// Creates a [`Subscription`] that publishes the events sent from a [`Future`]
+/// to an [`mpsc::Sender`] with the given bounds.
///
/// # Creating an asynchronous worker with bidirectional communication
/// You can leverage this helper to create a [`Subscription`] that spawns
@@ -145,6 +168,7 @@ where
/// ```
/// use iced_native::subscription::{self, Subscription};
/// use iced_native::futures::channel::mpsc;
+/// use iced_native::futures::sink::SinkExt;
///
/// pub enum Event {
/// Ready(mpsc::Sender<Input>),
@@ -165,27 +189,35 @@ where
/// fn some_worker() -> Subscription<Event> {
/// struct SomeWorker;
///
-/// subscription::unfold(std::any::TypeId::of::<SomeWorker>(), State::Starting, |state| async move {
-/// match state {
-/// State::Starting => {
-/// // Create channel
-/// let (sender, receiver) = mpsc::channel(100);
+/// subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move {
+/// let mut state = State::Starting;
///
-/// (Some(Event::Ready(sender)), State::Ready(receiver))
-/// }
-/// State::Ready(mut receiver) => {
-/// use iced_native::futures::StreamExt;
+/// loop {
+/// match &mut state {
+/// State::Starting => {
+/// // Create channel
+/// let (sender, receiver) = mpsc::channel(100);
+///
+/// // Send the sender back to the application
+/// output.send(Event::Ready(sender)).await;
+///
+/// // We are ready to receive messages
+/// state = State::Ready(receiver);
+/// }
+/// State::Ready(receiver) => {
+/// use iced_native::futures::StreamExt;
///
-/// // Read next input sent from `Application`
-/// let input = receiver.select_next_some().await;
+/// // Read next input sent from `Application`
+/// let input = receiver.select_next_some().await;
///
-/// match input {
-/// Input::DoSomeWork => {
-/// // Do some async work...
+/// match input {
+/// Input::DoSomeWork => {
+/// // Do some async work...
///
-/// // Finally, we can optionally return a message to tell the
-/// // `Application` the work is done
-/// (Some(Event::WorkFinished), State::Ready(receiver))
+/// // Finally, we can optionally produce a message to tell the
+/// // `Application` the work is done
+/// output.send(Event::WorkFinished).await;
+/// }
/// }
/// }
/// }
@@ -198,25 +230,28 @@ where
/// connection open.
///
/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.8/examples/websocket
-pub fn unfold<I, T, Fut, Message>(
+pub fn channel<I, Fut, Message>(
id: I,
- initial: T,
- mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static,
+ size: usize,
+ f: impl Fn(mpsc::Sender<Message>) -> Fut + MaybeSend + Sync + 'static,
) -> Subscription<Message>
where
I: Hash + 'static,
- T: MaybeSend + 'static,
- Fut: Future<Output = (Option<Message>, T)> + MaybeSend + 'static,
+ Fut: Future<Output = Never> + MaybeSend + 'static,
Message: 'static + MaybeSend,
{
- use futures::future::{self, FutureExt};
- use futures::stream::StreamExt;
+ use futures::stream::{self, StreamExt};
- run_with_id(
+ Subscription::from_recipe(Runner {
id,
- futures::stream::unfold(initial, move |state| f(state).map(Some))
- .filter_map(future::ready),
- )
+ spawn: move |_| {
+ let (sender, receiver) = mpsc::channel(size);
+
+ let runner = stream::once(f(sender)).map(|_| unreachable!());
+
+ stream::select(receiver, runner)
+ },
+ })
}
struct Runner<I, F, S, Message>