summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector@hecrj.dev>2024-06-15 00:43:51 +0200
committerLibravatar Héctor Ramón Jiménez <hector@hecrj.dev>2024-06-15 00:43:51 +0200
commit43033c7f83b13838803bc82a7bbef654a8071892 (patch)
tree96ccfd75c23a2692296406b415b25b534a68311a /runtime
parent7f13fab0582fe681b8546126245512bdc2338ead (diff)
downloadiced-43033c7f83b13838803bc82a7bbef654a8071892.tar.gz
iced-43033c7f83b13838803bc82a7bbef654a8071892.tar.bz2
iced-43033c7f83b13838803bc82a7bbef654a8071892.zip
Implement `Task::collect`
Diffstat (limited to 'runtime')
-rw-r--r--runtime/src/task.rs90
1 files changed, 61 insertions, 29 deletions
diff --git a/runtime/src/task.rs b/runtime/src/task.rs
index db3c0674..51cdf5a8 100644
--- a/runtime/src/task.rs
+++ b/runtime/src/task.rs
@@ -47,6 +47,42 @@ impl<T> Task<T> {
Self(Some(boxed_stream(stream.map(Action::Output))))
}
+ /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
+ /// output with the given closure.
+ pub fn perform<A>(
+ future: impl Future<Output = A> + MaybeSend + 'static,
+ f: impl Fn(A) -> T + MaybeSend + 'static,
+ ) -> Self
+ where
+ T: MaybeSend + 'static,
+ A: MaybeSend + 'static,
+ {
+ Self::future(future.map(f))
+ }
+
+ /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
+ /// item with the given closure.
+ pub fn run<A>(
+ stream: impl Stream<Item = A> + MaybeSend + 'static,
+ f: impl Fn(A) -> T + MaybeSend + 'static,
+ ) -> Self
+ where
+ T: 'static,
+ {
+ Self::stream(stream.map(f))
+ }
+
+ /// Combines the given tasks and produces a single [`Task`] that will run all of them
+ /// in parallel.
+ pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
+ where
+ T: 'static,
+ {
+ Self(Some(boxed_stream(stream::select_all(
+ tasks.into_iter().filter_map(|task| task.0),
+ ))))
+ }
+
/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
/// its output.
pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T>
@@ -163,38 +199,34 @@ impl<T> Task<T> {
}
}
- /// Creates a [`Task`] that runs the given [`Future`] to completion.
- pub fn perform<A>(
- future: impl Future<Output = A> + MaybeSend + 'static,
- f: impl Fn(A) -> T + MaybeSend + 'static,
- ) -> Self
+ /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
+ pub fn collect(self) -> Task<Vec<T>>
where
T: MaybeSend + 'static,
- A: MaybeSend + 'static,
- {
- Self::future(future.map(f))
- }
-
- /// Creates a [`Task`] that runs the given [`Stream`] to completion.
- pub fn run<A>(
- stream: impl Stream<Item = A> + MaybeSend + 'static,
- f: impl Fn(A) -> T + 'static + MaybeSend,
- ) -> Self
- where
- T: 'static,
{
- Self::stream(stream.map(f))
- }
-
- /// Combines the given tasks and produces a single [`Task`] that will run all of them
- /// in parallel.
- pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
- where
- T: 'static,
- {
- Self(Some(boxed_stream(stream::select_all(
- tasks.into_iter().filter_map(|task| task.0),
- ))))
+ match self.0 {
+ None => Task::done(Vec::new()),
+ Some(stream) => Task(Some(boxed_stream(
+ stream::unfold(
+ (stream, Vec::new()),
+ |(mut stream, mut outputs)| async move {
+ let action = stream.next().await?;
+
+ match action.output() {
+ Ok(output) => {
+ outputs.push(output);
+
+ Some((None, (stream, outputs)))
+ }
+ Err(action) => {
+ Some((Some(action), (stream, outputs)))
+ }
+ }
+ },
+ )
+ .filter_map(future::ready),
+ ))),
+ }
}
/// Returns the underlying [`Stream`] of the [`Task`].