From 35e4f307595cbb67687afcbc8d96ad97109210b5 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jan 2022 19:55:42 +0700 Subject: Implement `subscription::worker` :tada: --- native/src/subscription.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) (limited to 'native') diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 9ff89ccf..85ad96fa 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -124,6 +124,50 @@ where }) } +/// Returns a [`Subscription`] that will open a channel and asynchronously run a +/// [`Stream`] that will call the provided closure to produce every `Message`. +/// +/// When the [`Subscription`] starts, an `on_ready` message will be produced +/// containing the [`mpsc::Sender`] end of the channel, which can be used by +/// the parent application to send `Input` to the running [`Subscription`]. +/// +/// The provided closure should use the [`mpsc::Receiver`] argument to await for +/// any `Input`. +/// +/// This function is really useful to create asynchronous workers with +/// bidirectional communication with a parent application. +/// +/// The `initial` state will be used to uniquely identify the [`Subscription`]. +pub fn worker( + initial: T, + on_ready: impl FnOnce(mpsc::Sender) -> Message + 'static, + f: impl FnMut(T, &mut mpsc::Receiver) -> Fut + Send + Sync + 'static, +) -> Subscription +where + T: Clone + Hash + Send + 'static, + Fut: Future + Send + 'static, + Message: Send + 'static, + Input: Send + 'static, +{ + use futures::future; + use futures::stream::StreamExt; + + run(initial, move |initial| { + let (sender, receiver) = mpsc::channel(100); + + futures::stream::once(future::ready(on_ready(sender))).chain( + futures::stream::unfold( + (f, initial, receiver), + move |(mut f, state, mut receiver)| async { + let (message, state) = f(state, &mut receiver).await; + + Some((message, (f, state, receiver))) + }, + ), + ) + }) +} + struct Runner where F: FnOnce(T, EventStream) -> S, -- cgit