diff options
Diffstat (limited to 'runtime/src/task.rs')
-rw-r--r-- | runtime/src/task.rs | 163 |
1 files changed, 82 insertions, 81 deletions
diff --git a/runtime/src/task.rs b/runtime/src/task.rs index b8a83d6d..e037c403 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -1,3 +1,4 @@ +//! Create runtime tasks. use crate::core::widget; use crate::futures::futures::channel::mpsc; use crate::futures::futures::channel::oneshot; @@ -29,24 +30,6 @@ impl<T> Task<T> { Self::future(future::ready(value)) } - /// Creates a new [`Task`] that runs the given [`Future`] and produces - /// its output. - pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self - where - T: 'static, - { - Self::stream(stream::once(future)) - } - - /// Creates a new [`Task`] that runs the given [`Stream`] and produces - /// each of its items. - pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self - where - T: 'static, - { - Self(Some(boxed_stream(stream.map(Action::Output)))) - } - /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its /// output with the given closure. pub fn perform<A>( @@ -72,75 +55,33 @@ impl<T> Task<T> { Self::stream(stream.map(f)) } - /// Combines the given tasks and produces a single [`Task`] that will run all of them - /// in parallel. - pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self - where - T: 'static, - { - Self(Some(boxed_stream(stream::select_all( - tasks.into_iter().filter_map(|task| task.0), - )))) - } - - /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces + /// Creates a new [`Task`] that runs the given [`Future`] and produces /// its output. - pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T> + pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self where - T: Send + 'static, + T: 'static, { - Self::channel(move |sender| { - let operation = - widget::operation::map(Box::new(operation), move |value| { - let _ = sender.clone().try_send(value); - }); - - Action::Widget(Box::new(operation)) - }) + Self::stream(stream::once(future)) } - /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and - /// produces the value fed to the [`oneshot::Sender`]. - pub fn oneshot(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T> + /// Creates a new [`Task`] that runs the given [`Stream`] and produces + /// each of its items. + pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self where - T: MaybeSend + 'static, + T: 'static, { - let (sender, receiver) = oneshot::channel(); - - let action = f(sender); - - Self(Some(boxed_stream( - stream::once(async move { action }).chain( - receiver.into_stream().filter_map(|result| async move { - Some(Action::Output(result.ok()?)) - }), - ), - ))) + Self(Some(boxed_stream(stream.map(Action::Output)))) } - /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and - /// produces the values fed to the [`mpsc::Sender`]. - pub fn channel(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T> + /// Combines the given tasks and produces a single [`Task`] that will run all of them + /// in parallel. + pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self where - T: MaybeSend + 'static, + T: 'static, { - let (sender, receiver) = mpsc::channel(1); - - let action = f(sender); - - Self(Some(boxed_stream( - stream::once(async move { action }) - .chain(receiver.map(|result| Action::Output(result))), - ))) - } - - /// Creates a new [`Task`] that executes the given [`Action`] and produces no output. - pub fn effect(action: impl Into<Action<Never>>) -> Self { - let action = action.into(); - - Self(Some(boxed_stream(stream::once(async move { - action.output().expect_err("no output") - })))) + Self(Some(boxed_stream(stream::select_all( + tasks.into_iter().filter_map(|task| task.0), + )))) } /// Maps the output of a [`Task`] with the given closure. @@ -235,11 +176,6 @@ impl<T> Task<T> { ))), } } - - /// Returns the underlying [`Stream`] of the [`Task`]. - pub fn into_stream(self) -> Option<BoxStream<Action<T>>> { - self.0 - } } impl<T> Task<Option<T>> { @@ -283,3 +219,68 @@ where Self::none() } } + +/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces +/// its output. +pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T> +where + T: Send + 'static, +{ + channel(move |sender| { + let operation = + widget::operation::map(Box::new(operation), move |value| { + let _ = sender.clone().try_send(value); + }); + + Action::Widget(Box::new(operation)) + }) +} + +/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and +/// produces the value fed to the [`oneshot::Sender`]. +pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T> +where + T: MaybeSend + 'static, +{ + let (sender, receiver) = oneshot::channel(); + + let action = f(sender); + + Task(Some(boxed_stream( + stream::once(async move { action }).chain( + receiver.into_stream().filter_map(|result| async move { + Some(Action::Output(result.ok()?)) + }), + ), + ))) +} + +/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and +/// produces the values fed to the [`mpsc::Sender`]. +pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T> +where + T: MaybeSend + 'static, +{ + let (sender, receiver) = mpsc::channel(1); + + let action = f(sender); + + Task(Some(boxed_stream( + stream::once(async move { action }) + .chain(receiver.map(|result| Action::Output(result))), + ))) +} + +/// Creates a new [`Task`] that executes the given [`Action`] and produces no output. +pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> { + let action = action.into(); + + Task(Some(boxed_stream(stream::once(async move { + action.output().expect_err("no output") + })))) +} + +/// Returns the underlying [`Stream`] of the [`Task`]. +pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> { + task.0 +} |