summaryrefslogtreecommitdiffstats
path: root/runtime/src/task.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/src/task.rs')
-rw-r--r--runtime/src/task.rs251
1 files changed, 166 insertions, 85 deletions
diff --git a/runtime/src/task.rs b/runtime/src/task.rs
index b8a83d6d..ec8d7cc7 100644
--- a/runtime/src/task.rs
+++ b/runtime/src/task.rs
@@ -1,3 +1,4 @@
+//! Create runtime tasks.
use crate::core::widget;
use crate::futures::futures::channel::mpsc;
use crate::futures::futures::channel::oneshot;
@@ -29,24 +30,6 @@ impl<T> Task<T> {
Self::future(future::ready(value))
}
- /// Creates a new [`Task`] that runs the given [`Future`] and produces
- /// its output.
- pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
- where
- T: 'static,
- {
- Self::stream(stream::once(future))
- }
-
- /// Creates a new [`Task`] that runs the given [`Stream`] and produces
- /// each of its items.
- pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
- where
- T: 'static,
- {
- Self(Some(boxed_stream(stream.map(Action::Output))))
- }
-
/// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
/// output with the given closure.
pub fn perform<A>(
@@ -83,66 +66,6 @@ impl<T> Task<T> {
))))
}
- /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
- /// its output.
- pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T>
- where
- T: Send + 'static,
- {
- Self::channel(move |sender| {
- let operation =
- widget::operation::map(Box::new(operation), move |value| {
- let _ = sender.clone().try_send(value);
- });
-
- Action::Widget(Box::new(operation))
- })
- }
-
- /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
- /// produces the value fed to the [`oneshot::Sender`].
- pub fn oneshot(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
- where
- T: MaybeSend + 'static,
- {
- let (sender, receiver) = oneshot::channel();
-
- let action = f(sender);
-
- Self(Some(boxed_stream(
- stream::once(async move { action }).chain(
- receiver.into_stream().filter_map(|result| async move {
- Some(Action::Output(result.ok()?))
- }),
- ),
- )))
- }
-
- /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
- /// produces the values fed to the [`mpsc::Sender`].
- pub fn channel(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
- where
- T: MaybeSend + 'static,
- {
- let (sender, receiver) = mpsc::channel(1);
-
- let action = f(sender);
-
- Self(Some(boxed_stream(
- stream::once(async move { action })
- .chain(receiver.map(|result| Action::Output(result))),
- )))
- }
-
- /// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
- pub fn effect(action: impl Into<Action<Never>>) -> Self {
- let action = action.into();
-
- Self(Some(boxed_stream(stream::once(async move {
- action.output().expect_err("no output")
- }))))
- }
-
/// Maps the output of a [`Task`] with the given closure.
pub fn map<O>(
self,
@@ -236,9 +159,105 @@ impl<T> Task<T> {
}
}
- /// Returns the underlying [`Stream`] of the [`Task`].
- pub fn into_stream(self) -> Option<BoxStream<Action<T>>> {
- self.0
+ /// Creates a new [`Task`] that discards the result of the current one.
+ ///
+ /// Useful if you only care about the side effects of a [`Task`].
+ pub fn discard<O>(self) -> Task<O>
+ where
+ T: MaybeSend + 'static,
+ O: MaybeSend + 'static,
+ {
+ self.then(|_| Task::none())
+ }
+
+ /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
+ pub fn abortable(self) -> (Self, Handle)
+ where
+ T: 'static,
+ {
+ match self.0 {
+ Some(stream) => {
+ let (stream, handle) = stream::abortable(stream);
+
+ (
+ Self(Some(boxed_stream(stream))),
+ Handle {
+ raw: Some(handle),
+ abort_on_drop: false,
+ },
+ )
+ }
+ None => (
+ Self(None),
+ Handle {
+ raw: None,
+ abort_on_drop: false,
+ },
+ ),
+ }
+ }
+
+ /// Creates a new [`Task`] that runs the given [`Future`] and produces
+ /// its output.
+ pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
+ where
+ T: 'static,
+ {
+ Self::stream(stream::once(future))
+ }
+
+ /// Creates a new [`Task`] that runs the given [`Stream`] and produces
+ /// each of its items.
+ pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
+ where
+ T: 'static,
+ {
+ Self(Some(boxed_stream(stream.map(Action::Output))))
+ }
+}
+
+/// A handle to a [`Task`] that can be used for aborting it.
+#[derive(Debug, Clone)]
+pub struct Handle {
+ raw: Option<stream::AbortHandle>,
+ abort_on_drop: bool,
+}
+
+impl Handle {
+ /// Aborts the [`Task`] of this [`Handle`].
+ pub fn abort(&self) {
+ if let Some(handle) = &self.raw {
+ handle.abort();
+ }
+ }
+
+ /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
+ /// it is dropped.
+ ///
+ /// This can be really useful if you do not want to worry about calling
+ /// [`Handle::abort`] yourself.
+ pub fn abort_on_drop(mut self) -> Self {
+ Self {
+ raw: self.raw.take(),
+ abort_on_drop: true,
+ }
+ }
+
+ /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
+ pub fn is_aborted(&self) -> bool {
+ if let Some(handle) = &self.raw {
+ handle.is_aborted()
+ } else {
+ true
+ }
+ }
+}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ if self.abort_on_drop {
+ self.abort();
+ }
}
}
@@ -275,11 +294,73 @@ impl<T, E> Task<Result<T, E>> {
}
}
-impl<T> From<()> for Task<T>
-where
- T: MaybeSend + 'static,
-{
+impl<T> From<()> for Task<T> {
fn from(_value: ()) -> Self {
Self::none()
}
}
+
+/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
+/// its output.
+pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
+where
+ T: Send + 'static,
+{
+ channel(move |sender| {
+ let operation =
+ widget::operation::map(Box::new(operation), move |value| {
+ let _ = sender.clone().try_send(value);
+ });
+
+ Action::Widget(Box::new(operation))
+ })
+}
+
+/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
+/// produces the value fed to the [`oneshot::Sender`].
+pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
+where
+ T: MaybeSend + 'static,
+{
+ let (sender, receiver) = oneshot::channel();
+
+ let action = f(sender);
+
+ Task(Some(boxed_stream(
+ stream::once(async move { action }).chain(
+ receiver.into_stream().filter_map(|result| async move {
+ Some(Action::Output(result.ok()?))
+ }),
+ ),
+ )))
+}
+
+/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
+/// produces the values fed to the [`mpsc::Sender`].
+pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
+where
+ T: MaybeSend + 'static,
+{
+ let (sender, receiver) = mpsc::channel(1);
+
+ let action = f(sender);
+
+ Task(Some(boxed_stream(
+ stream::once(async move { action })
+ .chain(receiver.map(|result| Action::Output(result))),
+ )))
+}
+
+/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
+pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
+ let action = action.into();
+
+ Task(Some(boxed_stream(stream::once(async move {
+ action.output().expect_err("no output")
+ }))))
+}
+
+/// Returns the underlying [`Stream`] of the [`Task`].
+pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
+ task.0
+}