summaryrefslogtreecommitdiffstats
path: root/native
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2022-01-14 19:55:42 +0700
committerLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2022-01-14 19:55:50 +0700
commit35e4f307595cbb67687afcbc8d96ad97109210b5 (patch)
tree318cc113655e1aad828f018006c51a0d0c679dca /native
parent2a3271dc106b702a6b81888506578ec5f845281b (diff)
downloadiced-35e4f307595cbb67687afcbc8d96ad97109210b5.tar.gz
iced-35e4f307595cbb67687afcbc8d96ad97109210b5.tar.bz2
iced-35e4f307595cbb67687afcbc8d96ad97109210b5.zip
Implement `subscription::worker` :tada:
Diffstat (limited to 'native')
-rw-r--r--native/src/subscription.rs44
1 files changed, 44 insertions, 0 deletions
diff --git a/native/src/subscription.rs b/native/src/subscription.rs
index 9ff89ccf..85ad96fa 100644
--- a/native/src/subscription.rs
+++ b/native/src/subscription.rs
@@ -124,6 +124,50 @@ where
})
}
+/// Returns a [`Subscription`] that will open a channel and asynchronously run a
+/// [`Stream`] that will call the provided closure to produce every `Message`.
+///
+/// When the [`Subscription`] starts, an `on_ready` message will be produced
+/// containing the [`mpsc::Sender`] end of the channel, which can be used by
+/// the parent application to send `Input` to the running [`Subscription`].
+///
+/// The provided closure should use the [`mpsc::Receiver`] argument to await for
+/// any `Input`.
+///
+/// This function is really useful to create asynchronous workers with
+/// bidirectional communication with a parent application.
+///
+/// The `initial` state will be used to uniquely identify the [`Subscription`].
+pub fn worker<T, Fut, Message, Input>(
+ initial: T,
+ on_ready: impl FnOnce(mpsc::Sender<Input>) -> Message + 'static,
+ f: impl FnMut(T, &mut mpsc::Receiver<Input>) -> Fut + Send + Sync + 'static,
+) -> Subscription<Message>
+where
+ T: Clone + Hash + Send + 'static,
+ Fut: Future<Output = (Message, T)> + Send + 'static,
+ Message: Send + 'static,
+ Input: Send + 'static,
+{
+ use futures::future;
+ use futures::stream::StreamExt;
+
+ run(initial, move |initial| {
+ let (sender, receiver) = mpsc::channel(100);
+
+ futures::stream::once(future::ready(on_ready(sender))).chain(
+ futures::stream::unfold(
+ (f, initial, receiver),
+ move |(mut f, state, mut receiver)| async {
+ let (message, state) = f(state, &mut receiver).await;
+
+ Some((message, (f, state, receiver)))
+ },
+ ),
+ )
+ })
+}
+
struct Runner<T, F, S, Message>
where
F: FnOnce(T, EventStream) -> S,