diff options
author | 2022-01-14 19:55:42 +0700 | |
---|---|---|
committer | 2022-01-14 19:55:50 +0700 | |
commit | 35e4f307595cbb67687afcbc8d96ad97109210b5 (patch) | |
tree | 318cc113655e1aad828f018006c51a0d0c679dca /native | |
parent | 2a3271dc106b702a6b81888506578ec5f845281b (diff) | |
download | iced-35e4f307595cbb67687afcbc8d96ad97109210b5.tar.gz iced-35e4f307595cbb67687afcbc8d96ad97109210b5.tar.bz2 iced-35e4f307595cbb67687afcbc8d96ad97109210b5.zip |
Implement `subscription::worker` :tada:
Diffstat (limited to 'native')
-rw-r--r-- | native/src/subscription.rs | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 9ff89ccf..85ad96fa 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -124,6 +124,50 @@ where }) } +/// Returns a [`Subscription`] that will open a channel and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// When the [`Subscription`] starts, an `on_ready` message will be produced +/// containing the [`mpsc::Sender`] end of the channel, which can be used by +/// the parent application to send `Input` to the running [`Subscription`]. +/// +/// The provided closure should use the [`mpsc::Receiver`] argument to await for +/// any `Input`. +/// +/// This function is really useful to create asynchronous workers with +/// bidirectional communication with a parent application. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn worker<T, Fut, Message, Input>( + initial: T, + on_ready: impl FnOnce(mpsc::Sender<Input>) -> Message + 'static, + f: impl FnMut(T, &mut mpsc::Receiver<Input>) -> Fut + Send + Sync + 'static, +) -> Subscription<Message> +where + T: Clone + Hash + Send + 'static, + Fut: Future<Output = (Message, T)> + Send + 'static, + Message: Send + 'static, + Input: Send + 'static, +{ + use futures::future; + use futures::stream::StreamExt; + + run(initial, move |initial| { + let (sender, receiver) = mpsc::channel(100); + + futures::stream::once(future::ready(on_ready(sender))).chain( + futures::stream::unfold( + (f, initial, receiver), + move |(mut f, state, mut receiver)| async { + let (message, state) = f(state, &mut receiver).await; + + Some((message, (f, state, receiver))) + }, + ), + ) + }) +} + struct Runner<T, F, S, Message> where F: FnOnce(T, EventStream) -> S, |