diff options
author | 2024-06-15 00:43:51 +0200 | |
---|---|---|
committer | 2024-06-15 00:43:51 +0200 | |
commit | 43033c7f83b13838803bc82a7bbef654a8071892 (patch) | |
tree | 96ccfd75c23a2692296406b415b25b534a68311a /runtime | |
parent | 7f13fab0582fe681b8546126245512bdc2338ead (diff) | |
download | iced-43033c7f83b13838803bc82a7bbef654a8071892.tar.gz iced-43033c7f83b13838803bc82a7bbef654a8071892.tar.bz2 iced-43033c7f83b13838803bc82a7bbef654a8071892.zip |
Implement `Task::collect`
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/src/task.rs | 90 |
1 files changed, 61 insertions, 29 deletions
diff --git a/runtime/src/task.rs b/runtime/src/task.rs index db3c0674..51cdf5a8 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -47,6 +47,42 @@ impl<T> Task<T> { Self(Some(boxed_stream(stream.map(Action::Output)))) } + /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its + /// output with the given closure. + pub fn perform<A>( + future: impl Future<Output = A> + MaybeSend + 'static, + f: impl Fn(A) -> T + MaybeSend + 'static, + ) -> Self + where + T: MaybeSend + 'static, + A: MaybeSend + 'static, + { + Self::future(future.map(f)) + } + + /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each + /// item with the given closure. + pub fn run<A>( + stream: impl Stream<Item = A> + MaybeSend + 'static, + f: impl Fn(A) -> T + MaybeSend + 'static, + ) -> Self + where + T: 'static, + { + Self::stream(stream.map(f)) + } + + /// Combines the given tasks and produces a single [`Task`] that will run all of them + /// in parallel. + pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self + where + T: 'static, + { + Self(Some(boxed_stream(stream::select_all( + tasks.into_iter().filter_map(|task| task.0), + )))) + } + /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces /// its output. pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T> @@ -163,38 +199,34 @@ impl<T> Task<T> { } } - /// Creates a [`Task`] that runs the given [`Future`] to completion. - pub fn perform<A>( - future: impl Future<Output = A> + MaybeSend + 'static, - f: impl Fn(A) -> T + MaybeSend + 'static, - ) -> Self + /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`]. + pub fn collect(self) -> Task<Vec<T>> where T: MaybeSend + 'static, - A: MaybeSend + 'static, - { - Self::future(future.map(f)) - } - - /// Creates a [`Task`] that runs the given [`Stream`] to completion. - pub fn run<A>( - stream: impl Stream<Item = A> + MaybeSend + 'static, - f: impl Fn(A) -> T + 'static + MaybeSend, - ) -> Self - where - T: 'static, { - Self::stream(stream.map(f)) - } - - /// Combines the given tasks and produces a single [`Task`] that will run all of them - /// in parallel. - pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self - where - T: 'static, - { - Self(Some(boxed_stream(stream::select_all( - tasks.into_iter().filter_map(|task| task.0), - )))) + match self.0 { + None => Task::done(Vec::new()), + Some(stream) => Task(Some(boxed_stream( + stream::unfold( + (stream, Vec::new()), + |(mut stream, mut outputs)| async move { + let action = stream.next().await?; + + match action.output() { + Ok(output) => { + outputs.push(output); + + Some((None, (stream, outputs))) + } + Err(action) => { + Some((Some(action), (stream, outputs))) + } + } + }, + ) + .filter_map(future::ready), + ))), + } } /// Returns the underlying [`Stream`] of the [`Task`]. |