From 43033c7f83b13838803bc82a7bbef654a8071892 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 15 Jun 2024 00:43:51 +0200 Subject: Implement `Task::collect` --- runtime/src/task.rs | 90 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 29 deletions(-) (limited to 'runtime') diff --git a/runtime/src/task.rs b/runtime/src/task.rs index db3c0674..51cdf5a8 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -47,6 +47,42 @@ impl Task { Self(Some(boxed_stream(stream.map(Action::Output)))) } + /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its + /// output with the given closure. + pub fn perform( + future: impl Future + MaybeSend + 'static, + f: impl Fn(A) -> T + MaybeSend + 'static, + ) -> Self + where + T: MaybeSend + 'static, + A: MaybeSend + 'static, + { + Self::future(future.map(f)) + } + + /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each + /// item with the given closure. + pub fn run( + stream: impl Stream + MaybeSend + 'static, + f: impl Fn(A) -> T + MaybeSend + 'static, + ) -> Self + where + T: 'static, + { + Self::stream(stream.map(f)) + } + + /// Combines the given tasks and produces a single [`Task`] that will run all of them + /// in parallel. + pub fn batch(tasks: impl IntoIterator) -> Self + where + T: 'static, + { + Self(Some(boxed_stream(stream::select_all( + tasks.into_iter().filter_map(|task| task.0), + )))) + } + /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces /// its output. pub fn widget(operation: impl widget::Operation + 'static) -> Task @@ -163,38 +199,34 @@ impl Task { } } - /// Creates a [`Task`] that runs the given [`Future`] to completion. - pub fn perform( - future: impl Future + MaybeSend + 'static, - f: impl Fn(A) -> T + MaybeSend + 'static, - ) -> Self + /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`]. + pub fn collect(self) -> Task> where T: MaybeSend + 'static, - A: MaybeSend + 'static, - { - Self::future(future.map(f)) - } - - /// Creates a [`Task`] that runs the given [`Stream`] to completion. - pub fn run( - stream: impl Stream + MaybeSend + 'static, - f: impl Fn(A) -> T + 'static + MaybeSend, - ) -> Self - where - T: 'static, { - Self::stream(stream.map(f)) - } - - /// Combines the given tasks and produces a single [`Task`] that will run all of them - /// in parallel. - pub fn batch(tasks: impl IntoIterator) -> Self - where - T: 'static, - { - Self(Some(boxed_stream(stream::select_all( - tasks.into_iter().filter_map(|task| task.0), - )))) + match self.0 { + None => Task::done(Vec::new()), + Some(stream) => Task(Some(boxed_stream( + stream::unfold( + (stream, Vec::new()), + |(mut stream, mut outputs)| async move { + let action = stream.next().await?; + + match action.output() { + Ok(output) => { + outputs.push(output); + + Some((None, (stream, outputs))) + } + Err(action) => { + Some((Some(action), (stream, outputs))) + } + } + }, + ) + .filter_map(future::ready), + ))), + } } /// Returns the underlying [`Stream`] of the [`Task`]. -- cgit