diff options
Diffstat (limited to 'runtime/src')
| -rw-r--r-- | runtime/src/task.rs | 90 | 
1 files changed, 61 insertions, 29 deletions
| diff --git a/runtime/src/task.rs b/runtime/src/task.rs index db3c0674..51cdf5a8 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -47,6 +47,42 @@ impl<T> Task<T> {          Self(Some(boxed_stream(stream.map(Action::Output))))      } +    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its +    /// output with the given closure. +    pub fn perform<A>( +        future: impl Future<Output = A> + MaybeSend + 'static, +        f: impl Fn(A) -> T + MaybeSend + 'static, +    ) -> Self +    where +        T: MaybeSend + 'static, +        A: MaybeSend + 'static, +    { +        Self::future(future.map(f)) +    } + +    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each +    /// item with the given closure. +    pub fn run<A>( +        stream: impl Stream<Item = A> + MaybeSend + 'static, +        f: impl Fn(A) -> T + MaybeSend + 'static, +    ) -> Self +    where +        T: 'static, +    { +        Self::stream(stream.map(f)) +    } + +    /// Combines the given tasks and produces a single [`Task`] that will run all of them +    /// in parallel. +    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self +    where +        T: 'static, +    { +        Self(Some(boxed_stream(stream::select_all( +            tasks.into_iter().filter_map(|task| task.0), +        )))) +    } +      /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces      /// its output.      pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T> @@ -163,38 +199,34 @@ impl<T> Task<T> {          }      } -    /// Creates a [`Task`] that runs the given [`Future`] to completion. -    pub fn perform<A>( -        future: impl Future<Output = A> + MaybeSend + 'static, -        f: impl Fn(A) -> T + MaybeSend + 'static, -    ) -> Self +    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`]. +    pub fn collect(self) -> Task<Vec<T>>      where          T: MaybeSend + 'static, -        A: MaybeSend + 'static, -    { -        Self::future(future.map(f)) -    } - -    /// Creates a [`Task`] that runs the given [`Stream`] to completion. -    pub fn run<A>( -        stream: impl Stream<Item = A> + MaybeSend + 'static, -        f: impl Fn(A) -> T + 'static + MaybeSend, -    ) -> Self -    where -        T: 'static,      { -        Self::stream(stream.map(f)) -    } - -    /// Combines the given tasks and produces a single [`Task`] that will run all of them -    /// in parallel. -    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self -    where -        T: 'static, -    { -        Self(Some(boxed_stream(stream::select_all( -            tasks.into_iter().filter_map(|task| task.0), -        )))) +        match self.0 { +            None => Task::done(Vec::new()), +            Some(stream) => Task(Some(boxed_stream( +                stream::unfold( +                    (stream, Vec::new()), +                    |(mut stream, mut outputs)| async move { +                        let action = stream.next().await?; + +                        match action.output() { +                            Ok(output) => { +                                outputs.push(output); + +                                Some((None, (stream, outputs))) +                            } +                            Err(action) => { +                                Some((Some(action), (stream, outputs))) +                            } +                        } +                    }, +                ) +                .filter_map(future::ready), +            ))), +        }      }      /// Returns the underlying [`Stream`] of the [`Task`]. | 
