diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/src/task.rs | 17 |
1 files changed, 12 insertions, 5 deletions
diff --git a/runtime/src/task.rs b/runtime/src/task.rs index 51cdf5a8..740360ac 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -208,18 +208,25 @@ impl<T> Task<T> { None => Task::done(Vec::new()), Some(stream) => Task(Some(boxed_stream( stream::unfold( - (stream, Vec::new()), - |(mut stream, mut outputs)| async move { - let action = stream.next().await?; + (stream, Some(Vec::new())), + move |(mut stream, outputs)| async move { + let mut outputs = outputs?; + + let Some(action) = stream.next().await else { + return Some(( + Some(Action::Output(outputs)), + (stream, None), + )); + }; match action.output() { Ok(output) => { outputs.push(output); - Some((None, (stream, outputs))) + Some((None, (stream, Some(outputs)))) } Err(action) => { - Some((Some(action), (stream, outputs))) + Some((Some(action), (stream, Some(outputs)))) } } }, |