diff options
Diffstat (limited to 'runtime/src/task.rs')
-rw-r--r-- | runtime/src/task.rs | 200 |
1 files changed, 119 insertions, 81 deletions
diff --git a/runtime/src/task.rs b/runtime/src/task.rs index b8a83d6d..b75aca89 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -1,3 +1,4 @@ +//! Create runtime tasks. use crate::core::widget; use crate::futures::futures::channel::mpsc; use crate::futures::futures::channel::oneshot; @@ -29,24 +30,6 @@ impl<T> Task<T> { Self::future(future::ready(value)) } - /// Creates a new [`Task`] that runs the given [`Future`] and produces - /// its output. - pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self - where - T: 'static, - { - Self::stream(stream::once(future)) - } - - /// Creates a new [`Task`] that runs the given [`Stream`] and produces - /// each of its items. - pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self - where - T: 'static, - { - Self(Some(boxed_stream(stream.map(Action::Output)))) - } - /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its /// output with the given closure. pub fn perform<A>( @@ -83,66 +66,6 @@ impl<T> Task<T> { )))) } - /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces - /// its output. - pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T> - where - T: Send + 'static, - { - Self::channel(move |sender| { - let operation = - widget::operation::map(Box::new(operation), move |value| { - let _ = sender.clone().try_send(value); - }); - - Action::Widget(Box::new(operation)) - }) - } - - /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and - /// produces the value fed to the [`oneshot::Sender`]. - pub fn oneshot(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T> - where - T: MaybeSend + 'static, - { - let (sender, receiver) = oneshot::channel(); - - let action = f(sender); - - Self(Some(boxed_stream( - stream::once(async move { action }).chain( - receiver.into_stream().filter_map(|result| async move { - Some(Action::Output(result.ok()?)) - }), - ), - ))) - } - - /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and - /// produces the values fed to the [`mpsc::Sender`]. - pub fn channel(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T> - where - T: MaybeSend + 'static, - { - let (sender, receiver) = mpsc::channel(1); - - let action = f(sender); - - Self(Some(boxed_stream( - stream::once(async move { action }) - .chain(receiver.map(|result| Action::Output(result))), - ))) - } - - /// Creates a new [`Task`] that executes the given [`Action`] and produces no output. - pub fn effect(action: impl Into<Action<Never>>) -> Self { - let action = action.into(); - - Self(Some(boxed_stream(stream::once(async move { - action.output().expect_err("no output") - })))) - } - /// Maps the output of a [`Task`] with the given closure. pub fn map<O>( self, @@ -236,9 +159,59 @@ impl<T> Task<T> { } } - /// Returns the underlying [`Stream`] of the [`Task`]. - pub fn into_stream(self) -> Option<BoxStream<Action<T>>> { - self.0 + /// Creates a new [`Task`] that can be aborted with the returned [`Handle`]. + pub fn abortable(self) -> (Self, Handle) + where + T: 'static, + { + match self.0 { + Some(stream) => { + let (stream, handle) = stream::abortable(stream); + + (Self(Some(boxed_stream(stream))), Handle(Some(handle))) + } + None => (Self(None), Handle(None)), + } + } + + /// Creates a new [`Task`] that runs the given [`Future`] and produces + /// its output. + pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self + where + T: 'static, + { + Self::stream(stream::once(future)) + } + + /// Creates a new [`Task`] that runs the given [`Stream`] and produces + /// each of its items. + pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self + where + T: 'static, + { + Self(Some(boxed_stream(stream.map(Action::Output)))) + } +} + +/// A handle to a [`Task`] that can be used for aborting it. +#[derive(Debug, Clone)] +pub struct Handle(Option<stream::AbortHandle>); + +impl Handle { + /// Aborts the [`Task`] of this [`Handle`]. + pub fn abort(&self) { + if let Some(handle) = &self.0 { + handle.abort(); + } + } + + /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted. + pub fn is_aborted(&self) -> bool { + if let Some(handle) = &self.0 { + handle.is_aborted() + } else { + true + } } } @@ -283,3 +256,68 @@ where Self::none() } } + +/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces +/// its output. +pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T> +where + T: Send + 'static, +{ + channel(move |sender| { + let operation = + widget::operation::map(Box::new(operation), move |value| { + let _ = sender.clone().try_send(value); + }); + + Action::Widget(Box::new(operation)) + }) +} + +/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and +/// produces the value fed to the [`oneshot::Sender`]. +pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T> +where + T: MaybeSend + 'static, +{ + let (sender, receiver) = oneshot::channel(); + + let action = f(sender); + + Task(Some(boxed_stream( + stream::once(async move { action }).chain( + receiver.into_stream().filter_map(|result| async move { + Some(Action::Output(result.ok()?)) + }), + ), + ))) +} + +/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and +/// produces the values fed to the [`mpsc::Sender`]. +pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T> +where + T: MaybeSend + 'static, +{ + let (sender, receiver) = mpsc::channel(1); + + let action = f(sender); + + Task(Some(boxed_stream( + stream::once(async move { action }) + .chain(receiver.map(|result| Action::Output(result))), + ))) +} + +/// Creates a new [`Task`] that executes the given [`Action`] and produces no output. +pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> { + let action = action.into(); + + Task(Some(boxed_stream(stream::once(async move { + action.output().expect_err("no output") + })))) +} + +/// Returns the underlying [`Stream`] of the [`Task`]. +pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> { + task.0 +} |