summaryrefslogtreecommitdiffstats
path: root/runtime/src/task.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/src/task.rs')
-rw-r--r--runtime/src/task.rs163
1 files changed, 82 insertions, 81 deletions
diff --git a/runtime/src/task.rs b/runtime/src/task.rs
index b8a83d6d..e037c403 100644
--- a/runtime/src/task.rs
+++ b/runtime/src/task.rs
@@ -1,3 +1,4 @@
+//! Create runtime tasks.
use crate::core::widget;
use crate::futures::futures::channel::mpsc;
use crate::futures::futures::channel::oneshot;
@@ -29,24 +30,6 @@ impl<T> Task<T> {
Self::future(future::ready(value))
}
- /// Creates a new [`Task`] that runs the given [`Future`] and produces
- /// its output.
- pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
- where
- T: 'static,
- {
- Self::stream(stream::once(future))
- }
-
- /// Creates a new [`Task`] that runs the given [`Stream`] and produces
- /// each of its items.
- pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
- where
- T: 'static,
- {
- Self(Some(boxed_stream(stream.map(Action::Output))))
- }
-
/// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
/// output with the given closure.
pub fn perform<A>(
@@ -72,75 +55,33 @@ impl<T> Task<T> {
Self::stream(stream.map(f))
}
- /// Combines the given tasks and produces a single [`Task`] that will run all of them
- /// in parallel.
- pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
- where
- T: 'static,
- {
- Self(Some(boxed_stream(stream::select_all(
- tasks.into_iter().filter_map(|task| task.0),
- ))))
- }
-
- /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
+ /// Creates a new [`Task`] that runs the given [`Future`] and produces
/// its output.
- pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T>
+ pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
where
- T: Send + 'static,
+ T: 'static,
{
- Self::channel(move |sender| {
- let operation =
- widget::operation::map(Box::new(operation), move |value| {
- let _ = sender.clone().try_send(value);
- });
-
- Action::Widget(Box::new(operation))
- })
+ Self::stream(stream::once(future))
}
- /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
- /// produces the value fed to the [`oneshot::Sender`].
- pub fn oneshot(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
+ /// Creates a new [`Task`] that runs the given [`Stream`] and produces
+ /// each of its items.
+ pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
where
- T: MaybeSend + 'static,
+ T: 'static,
{
- let (sender, receiver) = oneshot::channel();
-
- let action = f(sender);
-
- Self(Some(boxed_stream(
- stream::once(async move { action }).chain(
- receiver.into_stream().filter_map(|result| async move {
- Some(Action::Output(result.ok()?))
- }),
- ),
- )))
+ Self(Some(boxed_stream(stream.map(Action::Output))))
}
- /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
- /// produces the values fed to the [`mpsc::Sender`].
- pub fn channel(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
+ /// Combines the given tasks and produces a single [`Task`] that will run all of them
+ /// in parallel.
+ pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
where
- T: MaybeSend + 'static,
+ T: 'static,
{
- let (sender, receiver) = mpsc::channel(1);
-
- let action = f(sender);
-
- Self(Some(boxed_stream(
- stream::once(async move { action })
- .chain(receiver.map(|result| Action::Output(result))),
- )))
- }
-
- /// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
- pub fn effect(action: impl Into<Action<Never>>) -> Self {
- let action = action.into();
-
- Self(Some(boxed_stream(stream::once(async move {
- action.output().expect_err("no output")
- }))))
+ Self(Some(boxed_stream(stream::select_all(
+ tasks.into_iter().filter_map(|task| task.0),
+ ))))
}
/// Maps the output of a [`Task`] with the given closure.
@@ -235,11 +176,6 @@ impl<T> Task<T> {
))),
}
}
-
- /// Returns the underlying [`Stream`] of the [`Task`].
- pub fn into_stream(self) -> Option<BoxStream<Action<T>>> {
- self.0
- }
}
impl<T> Task<Option<T>> {
@@ -283,3 +219,68 @@ where
Self::none()
}
}
+
+/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
+/// its output.
+pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
+where
+ T: Send + 'static,
+{
+ channel(move |sender| {
+ let operation =
+ widget::operation::map(Box::new(operation), move |value| {
+ let _ = sender.clone().try_send(value);
+ });
+
+ Action::Widget(Box::new(operation))
+ })
+}
+
+/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
+/// produces the value fed to the [`oneshot::Sender`].
+pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
+where
+ T: MaybeSend + 'static,
+{
+ let (sender, receiver) = oneshot::channel();
+
+ let action = f(sender);
+
+ Task(Some(boxed_stream(
+ stream::once(async move { action }).chain(
+ receiver.into_stream().filter_map(|result| async move {
+ Some(Action::Output(result.ok()?))
+ }),
+ ),
+ )))
+}
+
+/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
+/// produces the values fed to the [`mpsc::Sender`].
+pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
+where
+ T: MaybeSend + 'static,
+{
+ let (sender, receiver) = mpsc::channel(1);
+
+ let action = f(sender);
+
+ Task(Some(boxed_stream(
+ stream::once(async move { action })
+ .chain(receiver.map(|result| Action::Output(result))),
+ )))
+}
+
+/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
+pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
+ let action = action.into();
+
+ Task(Some(boxed_stream(stream::once(async move {
+ action.output().expect_err("no output")
+ }))))
+}
+
+/// Returns the underlying [`Stream`] of the [`Task`].
+pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
+ task.0
+}