diff options
Diffstat (limited to 'runtime/src/task.rs')
-rw-r--r-- | runtime/src/task.rs | 251 |
1 files changed, 166 insertions, 85 deletions
diff --git a/runtime/src/task.rs b/runtime/src/task.rs index b8a83d6d..ec8d7cc7 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -1,3 +1,4 @@ +//! Create runtime tasks. use crate::core::widget; use crate::futures::futures::channel::mpsc; use crate::futures::futures::channel::oneshot; @@ -29,24 +30,6 @@ impl<T> Task<T> { Self::future(future::ready(value)) } - /// Creates a new [`Task`] that runs the given [`Future`] and produces - /// its output. - pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self - where - T: 'static, - { - Self::stream(stream::once(future)) - } - - /// Creates a new [`Task`] that runs the given [`Stream`] and produces - /// each of its items. - pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self - where - T: 'static, - { - Self(Some(boxed_stream(stream.map(Action::Output)))) - } - /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its /// output with the given closure. pub fn perform<A>( @@ -83,66 +66,6 @@ impl<T> Task<T> { )))) } - /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces - /// its output. - pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T> - where - T: Send + 'static, - { - Self::channel(move |sender| { - let operation = - widget::operation::map(Box::new(operation), move |value| { - let _ = sender.clone().try_send(value); - }); - - Action::Widget(Box::new(operation)) - }) - } - - /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and - /// produces the value fed to the [`oneshot::Sender`]. - pub fn oneshot(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T> - where - T: MaybeSend + 'static, - { - let (sender, receiver) = oneshot::channel(); - - let action = f(sender); - - Self(Some(boxed_stream( - stream::once(async move { action }).chain( - receiver.into_stream().filter_map(|result| async move { - Some(Action::Output(result.ok()?)) - }), - ), - ))) - } - - /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and - /// produces the values fed to the [`mpsc::Sender`]. - pub fn channel(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T> - where - T: MaybeSend + 'static, - { - let (sender, receiver) = mpsc::channel(1); - - let action = f(sender); - - Self(Some(boxed_stream( - stream::once(async move { action }) - .chain(receiver.map(|result| Action::Output(result))), - ))) - } - - /// Creates a new [`Task`] that executes the given [`Action`] and produces no output. - pub fn effect(action: impl Into<Action<Never>>) -> Self { - let action = action.into(); - - Self(Some(boxed_stream(stream::once(async move { - action.output().expect_err("no output") - })))) - } - /// Maps the output of a [`Task`] with the given closure. pub fn map<O>( self, @@ -236,9 +159,105 @@ impl<T> Task<T> { } } - /// Returns the underlying [`Stream`] of the [`Task`]. - pub fn into_stream(self) -> Option<BoxStream<Action<T>>> { - self.0 + /// Creates a new [`Task`] that discards the result of the current one. + /// + /// Useful if you only care about the side effects of a [`Task`]. + pub fn discard<O>(self) -> Task<O> + where + T: MaybeSend + 'static, + O: MaybeSend + 'static, + { + self.then(|_| Task::none()) + } + + /// Creates a new [`Task`] that can be aborted with the returned [`Handle`]. + pub fn abortable(self) -> (Self, Handle) + where + T: 'static, + { + match self.0 { + Some(stream) => { + let (stream, handle) = stream::abortable(stream); + + ( + Self(Some(boxed_stream(stream))), + Handle { + raw: Some(handle), + abort_on_drop: false, + }, + ) + } + None => ( + Self(None), + Handle { + raw: None, + abort_on_drop: false, + }, + ), + } + } + + /// Creates a new [`Task`] that runs the given [`Future`] and produces + /// its output. + pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self + where + T: 'static, + { + Self::stream(stream::once(future)) + } + + /// Creates a new [`Task`] that runs the given [`Stream`] and produces + /// each of its items. + pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self + where + T: 'static, + { + Self(Some(boxed_stream(stream.map(Action::Output)))) + } +} + +/// A handle to a [`Task`] that can be used for aborting it. +#[derive(Debug, Clone)] +pub struct Handle { + raw: Option<stream::AbortHandle>, + abort_on_drop: bool, +} + +impl Handle { + /// Aborts the [`Task`] of this [`Handle`]. + pub fn abort(&self) { + if let Some(handle) = &self.raw { + handle.abort(); + } + } + + /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever + /// it is dropped. + /// + /// This can be really useful if you do not want to worry about calling + /// [`Handle::abort`] yourself. + pub fn abort_on_drop(mut self) -> Self { + Self { + raw: self.raw.take(), + abort_on_drop: true, + } + } + + /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted. + pub fn is_aborted(&self) -> bool { + if let Some(handle) = &self.raw { + handle.is_aborted() + } else { + true + } + } +} + +impl Drop for Handle { + fn drop(&mut self) { + if self.abort_on_drop { + self.abort(); + } } } @@ -275,11 +294,73 @@ impl<T, E> Task<Result<T, E>> { } } -impl<T> From<()> for Task<T> -where - T: MaybeSend + 'static, -{ +impl<T> From<()> for Task<T> { fn from(_value: ()) -> Self { Self::none() } } + +/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces +/// its output. +pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T> +where + T: Send + 'static, +{ + channel(move |sender| { + let operation = + widget::operation::map(Box::new(operation), move |value| { + let _ = sender.clone().try_send(value); + }); + + Action::Widget(Box::new(operation)) + }) +} + +/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and +/// produces the value fed to the [`oneshot::Sender`]. +pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T> +where + T: MaybeSend + 'static, +{ + let (sender, receiver) = oneshot::channel(); + + let action = f(sender); + + Task(Some(boxed_stream( + stream::once(async move { action }).chain( + receiver.into_stream().filter_map(|result| async move { + Some(Action::Output(result.ok()?)) + }), + ), + ))) +} + +/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and +/// produces the values fed to the [`mpsc::Sender`]. +pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T> +where + T: MaybeSend + 'static, +{ + let (sender, receiver) = mpsc::channel(1); + + let action = f(sender); + + Task(Some(boxed_stream( + stream::once(async move { action }) + .chain(receiver.map(|result| Action::Output(result))), + ))) +} + +/// Creates a new [`Task`] that executes the given [`Action`] and produces no output. +pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> { + let action = action.into(); + + Task(Some(boxed_stream(stream::once(async move { + action.output().expect_err("no output") + })))) +} + +/// Returns the underlying [`Stream`] of the [`Task`]. +pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> { + task.0 +} |