From a25b1af45690bdd8e1cbb20ee3a5b1c4342de455 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jun 2024 01:47:39 +0200 Subject: Replace `Command` with a new `Task` API with chain support --- runtime/src/task.rs | 214 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 runtime/src/task.rs (limited to 'runtime/src/task.rs') diff --git a/runtime/src/task.rs b/runtime/src/task.rs new file mode 100644 index 00000000..ac28a4e7 --- /dev/null +++ b/runtime/src/task.rs @@ -0,0 +1,214 @@ +use crate::core::widget; +use crate::core::MaybeSend; +use crate::futures::futures::channel::mpsc; +use crate::futures::futures::channel::oneshot; +use crate::futures::futures::future::{self, FutureExt}; +use crate::futures::futures::never::Never; +use crate::futures::futures::stream::{self, Stream, StreamExt}; +use crate::futures::{boxed_stream, BoxStream}; +use crate::Action; + +use std::future::Future; + +/// A set of concurrent actions to be performed by the iced runtime. +/// +/// A [`Task`] _may_ produce a bunch of values of type `T`. +#[allow(missing_debug_implementations)] +pub struct Task(Option>>); + +impl Task { + /// Creates a [`Task`] that does nothing. + pub fn none() -> Self { + Self(None) + } + + /// Creates a new [`Task`] that instantly produces the given value. + pub fn done(value: T) -> Self + where + T: MaybeSend + 'static, + { + Self::future(future::ready(value)) + } + + /// Creates a new [`Task`] that runs the given [`Future`] and produces + /// its output. + pub fn future(future: impl Future + MaybeSend + 'static) -> Self + where + T: 'static, + { + Self::stream(stream::once(future)) + } + + /// Creates a new [`Task`] that runs the given [`Stream`] and produces + /// each of its items. + pub fn stream(stream: impl Stream + MaybeSend + 'static) -> Self + where + T: 'static, + { + Self(Some(boxed_stream(stream.map(Action::Output)))) + } + + /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces + /// its output. + pub fn widget(operation: impl widget::Operation + 'static) -> Task + where + T: MaybeSend + 'static, + { + Self::channel(move |sender| { + let operation = + widget::operation::map(Box::new(operation), move |value| { + let _ = sender.clone().try_send(value); + }); + + Action::Widget(Box::new(operation)) + }) + } + + /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and + /// produces the value fed to the [`oneshot::Sender`]. + pub fn oneshot(f: impl FnOnce(oneshot::Sender) -> Action) -> Task + where + T: MaybeSend + 'static, + { + let (sender, receiver) = oneshot::channel(); + + let action = f(sender); + + Self(Some(boxed_stream( + stream::once(async move { action }).chain( + receiver.into_stream().filter_map(|result| async move { + Some(Action::Output(result.ok()?)) + }), + ), + ))) + } + + /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and + /// produces the values fed to the [`mpsc::Sender`]. + pub fn channel(f: impl FnOnce(mpsc::Sender) -> Action) -> Task + where + T: MaybeSend + 'static, + { + let (sender, receiver) = mpsc::channel(1); + + let action = f(sender); + + Self(Some(boxed_stream( + stream::once(async move { action }) + .chain(receiver.map(|result| Action::Output(result))), + ))) + } + + /// Creates a new [`Task`] that executes the given [`Action`] and produces no output. + pub fn effect(action: impl Into>) -> Self { + let action = action.into(); + + Self(Some(boxed_stream(stream::once(async move { + action.output().expect_err("no output") + })))) + } + + /// Maps the output of a [`Task`] with the given closure. + pub fn map( + self, + mut f: impl FnMut(T) -> O + MaybeSend + 'static, + ) -> Task + where + T: MaybeSend + 'static, + O: MaybeSend + 'static, + { + self.then(move |output| Task::done(f(output))) + } + + /// Performs a new [`Task`] for every output of the current [`Task`] using the + /// given closure. + /// + /// This is the monadic interface of [`Task`]—analogous to [`Future`] and + /// [`Stream`]. + pub fn then( + self, + mut f: impl FnMut(T) -> Task + MaybeSend + 'static, + ) -> Task + where + T: MaybeSend + 'static, + O: MaybeSend + 'static, + { + Task(match self.0 { + None => None, + Some(stream) => { + Some(boxed_stream(stream.flat_map(move |action| { + match action.output() { + Ok(output) => f(output) + .0 + .unwrap_or_else(|| boxed_stream(stream::empty())), + Err(action) => { + boxed_stream(stream::once(async move { action })) + } + } + }))) + } + }) + } + + /// Chains a new [`Task`] to be performed once the current one finishes completely. + pub fn chain(self, task: Self) -> Self + where + T: 'static, + { + match self.0 { + None => task, + Some(first) => match task.0 { + None => Task::none(), + Some(second) => Task(Some(boxed_stream(first.chain(second)))), + }, + } + } + + /// Creates a [`Task`] that runs the given [`Future`] to completion. + pub fn perform( + future: impl Future + MaybeSend + 'static, + f: impl Fn(A) -> T + MaybeSend + 'static, + ) -> Self + where + T: MaybeSend + 'static, + A: MaybeSend + 'static, + { + Self::future(future.map(f)) + } + + /// Creates a [`Task`] that runs the given [`Stream`] to completion. + pub fn run( + stream: impl Stream + MaybeSend + 'static, + f: impl Fn(A) -> T + 'static + MaybeSend, + ) -> Self + where + T: 'static, + { + Self::stream(stream.map(f)) + } + + /// Combines the given tasks and produces a single [`Task`] that will run all of them + /// in parallel. + pub fn batch(tasks: impl IntoIterator) -> Self + where + T: 'static, + { + Self(Some(boxed_stream(stream::select_all( + tasks.into_iter().filter_map(|task| task.0), + )))) + } + + /// Returns the underlying [`Stream`] of the [`Task`]. + pub fn into_stream(self) -> Option>> { + self.0 + } +} + +impl From<()> for Task +where + T: MaybeSend + 'static, +{ + fn from(_value: ()) -> Self { + Self::none() + } +} -- cgit From b328da2c71e998e539bdc65815061e88dd1e7081 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jun 2024 01:52:30 +0200 Subject: Fix `Send` requirements for Wasm targets --- runtime/src/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/src/task.rs') diff --git a/runtime/src/task.rs b/runtime/src/task.rs index ac28a4e7..f3ddbca1 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -52,7 +52,7 @@ impl Task { /// its output. pub fn widget(operation: impl widget::Operation + 'static) -> Task where - T: MaybeSend + 'static, + T: Send + 'static, { Self::channel(move |sender| { let operation = -- cgit From 4e7cbbf98ab745351e2fb13a7c85d4ad560c21ee Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 14 Jun 2024 01:57:49 +0200 Subject: Move `Maybe*` traits back to `iced_futures` --- runtime/src/task.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'runtime/src/task.rs') diff --git a/runtime/src/task.rs b/runtime/src/task.rs index f3ddbca1..db3c0674 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -1,11 +1,10 @@ use crate::core::widget; -use crate::core::MaybeSend; use crate::futures::futures::channel::mpsc; use crate::futures::futures::channel::oneshot; use crate::futures::futures::future::{self, FutureExt}; use crate::futures::futures::never::Never; use crate::futures::futures::stream::{self, Stream, StreamExt}; -use crate::futures::{boxed_stream, BoxStream}; +use crate::futures::{boxed_stream, BoxStream, MaybeSend}; use crate::Action; use std::future::Future; -- cgit From 43033c7f83b13838803bc82a7bbef654a8071892 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 15 Jun 2024 00:43:51 +0200 Subject: Implement `Task::collect` --- runtime/src/task.rs | 90 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 29 deletions(-) (limited to 'runtime/src/task.rs') diff --git a/runtime/src/task.rs b/runtime/src/task.rs index db3c0674..51cdf5a8 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -47,6 +47,42 @@ impl Task { Self(Some(boxed_stream(stream.map(Action::Output)))) } + /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its + /// output with the given closure. + pub fn perform( + future: impl Future + MaybeSend + 'static, + f: impl Fn(A) -> T + MaybeSend + 'static, + ) -> Self + where + T: MaybeSend + 'static, + A: MaybeSend + 'static, + { + Self::future(future.map(f)) + } + + /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each + /// item with the given closure. + pub fn run( + stream: impl Stream + MaybeSend + 'static, + f: impl Fn(A) -> T + MaybeSend + 'static, + ) -> Self + where + T: 'static, + { + Self::stream(stream.map(f)) + } + + /// Combines the given tasks and produces a single [`Task`] that will run all of them + /// in parallel. + pub fn batch(tasks: impl IntoIterator) -> Self + where + T: 'static, + { + Self(Some(boxed_stream(stream::select_all( + tasks.into_iter().filter_map(|task| task.0), + )))) + } + /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces /// its output. pub fn widget(operation: impl widget::Operation + 'static) -> Task @@ -163,38 +199,34 @@ impl Task { } } - /// Creates a [`Task`] that runs the given [`Future`] to completion. - pub fn perform( - future: impl Future + MaybeSend + 'static, - f: impl Fn(A) -> T + MaybeSend + 'static, - ) -> Self + /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`]. + pub fn collect(self) -> Task> where T: MaybeSend + 'static, - A: MaybeSend + 'static, - { - Self::future(future.map(f)) - } - - /// Creates a [`Task`] that runs the given [`Stream`] to completion. - pub fn run( - stream: impl Stream + MaybeSend + 'static, - f: impl Fn(A) -> T + 'static + MaybeSend, - ) -> Self - where - T: 'static, { - Self::stream(stream.map(f)) - } - - /// Combines the given tasks and produces a single [`Task`] that will run all of them - /// in parallel. - pub fn batch(tasks: impl IntoIterator) -> Self - where - T: 'static, - { - Self(Some(boxed_stream(stream::select_all( - tasks.into_iter().filter_map(|task| task.0), - )))) + match self.0 { + None => Task::done(Vec::new()), + Some(stream) => Task(Some(boxed_stream( + stream::unfold( + (stream, Vec::new()), + |(mut stream, mut outputs)| async move { + let action = stream.next().await?; + + match action.output() { + Ok(output) => { + outputs.push(output); + + Some((None, (stream, outputs))) + } + Err(action) => { + Some((Some(action), (stream, outputs))) + } + } + }, + ) + .filter_map(future::ready), + ))), + } } /// Returns the underlying [`Stream`] of the [`Task`]. -- cgit From ad2e4c535af01453777e330aa828db6988f9c4de Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 15 Jun 2024 01:16:04 +0200 Subject: Fix `Task::collect` not producing the collected outputs --- runtime/src/task.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) (limited to 'runtime/src/task.rs') diff --git a/runtime/src/task.rs b/runtime/src/task.rs index 51cdf5a8..740360ac 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -208,18 +208,25 @@ impl Task { None => Task::done(Vec::new()), Some(stream) => Task(Some(boxed_stream( stream::unfold( - (stream, Vec::new()), - |(mut stream, mut outputs)| async move { - let action = stream.next().await?; + (stream, Some(Vec::new())), + move |(mut stream, outputs)| async move { + let mut outputs = outputs?; + + let Some(action) = stream.next().await else { + return Some(( + Some(Action::Output(outputs)), + (stream, None), + )); + }; match action.output() { Ok(output) => { outputs.push(output); - Some((None, (stream, outputs))) + Some((None, (stream, Some(outputs)))) } Err(action) => { - Some((Some(action), (stream, outputs))) + Some((Some(action), (stream, Some(outputs)))) } } }, -- cgit