From 8efe161e3d08b56cba8db1320b8433efa45fa79e Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Wed, 10 Jul 2024 14:24:52 +0200 Subject: Move docs of `future` and `stream` in `Task` --- runtime/src/task.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'runtime/src') diff --git a/runtime/src/task.rs b/runtime/src/task.rs index e037c403..d1864473 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -55,24 +55,6 @@ impl Task { Self::stream(stream.map(f)) } - /// Creates a new [`Task`] that runs the given [`Future`] and produces - /// its output. - pub fn future(future: impl Future + MaybeSend + 'static) -> Self - where - T: 'static, - { - Self::stream(stream::once(future)) - } - - /// Creates a new [`Task`] that runs the given [`Stream`] and produces - /// each of its items. - pub fn stream(stream: impl Stream + MaybeSend + 'static) -> Self - where - T: 'static, - { - Self(Some(boxed_stream(stream.map(Action::Output)))) - } - /// Combines the given tasks and produces a single [`Task`] that will run all of them /// in parallel. pub fn batch(tasks: impl IntoIterator) -> Self @@ -176,6 +158,24 @@ impl Task { ))), } } + + /// Creates a new [`Task`] that runs the given [`Future`] and produces + /// its output. + pub fn future(future: impl Future + MaybeSend + 'static) -> Self + where + T: 'static, + { + Self::stream(stream::once(future)) + } + + /// Creates a new [`Task`] that runs the given [`Stream`] and produces + /// each of its items. + pub fn stream(stream: impl Stream + MaybeSend + 'static) -> Self + where + T: 'static, + { + Self(Some(boxed_stream(stream.map(Action::Output)))) + } } impl Task> { -- cgit From 47f9554a82e65679c13ef17f3f3bf7fff5156184 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Wed, 10 Jul 2024 14:40:58 +0200 Subject: Introduce `Task::abortable` :tada: --- runtime/src/task.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) (limited to 'runtime/src') diff --git a/runtime/src/task.rs b/runtime/src/task.rs index d1864473..b75aca89 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -159,6 +159,21 @@ impl Task { } } + /// Creates a new [`Task`] that can be aborted with the returned [`Handle`]. + pub fn abortable(self) -> (Self, Handle) + where + T: 'static, + { + match self.0 { + Some(stream) => { + let (stream, handle) = stream::abortable(stream); + + (Self(Some(boxed_stream(stream))), Handle(Some(handle))) + } + None => (Self(None), Handle(None)), + } + } + /// Creates a new [`Task`] that runs the given [`Future`] and produces /// its output. pub fn future(future: impl Future + MaybeSend + 'static) -> Self @@ -178,6 +193,28 @@ impl Task { } } +/// A handle to a [`Task`] that can be used for aborting it. +#[derive(Debug, Clone)] +pub struct Handle(Option); + +impl Handle { + /// Aborts the [`Task`] of this [`Handle`]. + pub fn abort(&self) { + if let Some(handle) = &self.0 { + handle.abort(); + } + } + + /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted. + pub fn is_aborted(&self) -> bool { + if let Some(handle) = &self.0 { + handle.is_aborted() + } else { + true + } + } +} + impl Task> { /// Executes a new [`Task`] after this one, only when it produces `Some` value. /// -- cgit