diff options
Diffstat (limited to 'native/src/subscription.rs')
-rw-r--r-- | native/src/subscription.rs | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 9ff89ccf..85ad96fa 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -124,6 +124,50 @@ where }) } +/// Returns a [`Subscription`] that will open a channel and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// When the [`Subscription`] starts, an `on_ready` message will be produced +/// containing the [`mpsc::Sender`] end of the channel, which can be used by +/// the parent application to send `Input` to the running [`Subscription`]. +/// +/// The provided closure should use the [`mpsc::Receiver`] argument to await for +/// any `Input`. +/// +/// This function is really useful to create asynchronous workers with +/// bidirectional communication with a parent application. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn worker<T, Fut, Message, Input>( + initial: T, + on_ready: impl FnOnce(mpsc::Sender<Input>) -> Message + 'static, + f: impl FnMut(T, &mut mpsc::Receiver<Input>) -> Fut + Send + Sync + 'static, +) -> Subscription<Message> +where + T: Clone + Hash + Send + 'static, + Fut: Future<Output = (Message, T)> + Send + 'static, + Message: Send + 'static, + Input: Send + 'static, +{ + use futures::future; + use futures::stream::StreamExt; + + run(initial, move |initial| { + let (sender, receiver) = mpsc::channel(100); + + futures::stream::once(future::ready(on_ready(sender))).chain( + futures::stream::unfold( + (f, initial, receiver), + move |(mut f, state, mut receiver)| async { + let (message, state) = f(state, &mut receiver).await; + + Some((message, (f, state, receiver))) + }, + ), + ) + }) +} + struct Runner<T, F, S, Message> where F: FnOnce(T, EventStream) -> S, |