summaryrefslogtreecommitdiffstats
path: root/runtime/src/command.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/src/command.rs')
-rw-r--r--runtime/src/command.rs41
1 files changed, 37 insertions, 4 deletions
diff --git a/runtime/src/command.rs b/runtime/src/command.rs
index cd4c51ff..f70da915 100644
--- a/runtime/src/command.rs
+++ b/runtime/src/command.rs
@@ -4,8 +4,11 @@ mod action;
pub use action::Action;
use crate::core::widget;
+use crate::futures::futures;
use crate::futures::MaybeSend;
+use futures::channel::mpsc;
+use futures::Stream;
use std::fmt;
use std::future::Future;
@@ -40,14 +43,24 @@ impl<T> Command<T> {
/// Creates a [`Command`] that performs the action of the given future.
pub fn perform<A>(
- future: impl Future<Output = T> + 'static + MaybeSend,
- f: impl FnOnce(T) -> A + 'static + MaybeSend,
- ) -> Command<A> {
- use iced_futures::futures::FutureExt;
+ future: impl Future<Output = A> + 'static + MaybeSend,
+ f: impl FnOnce(A) -> T + 'static + MaybeSend,
+ ) -> Command<T> {
+ use futures::FutureExt;
Command::single(Action::Future(Box::pin(future.map(f))))
}
+ /// Creates a [`Command`] that runs the given stream to completion.
+ pub fn run<A>(
+ stream: impl Stream<Item = A> + 'static + MaybeSend,
+ f: impl Fn(A) -> T + 'static + MaybeSend,
+ ) -> Command<T> {
+ use futures::StreamExt;
+
+ Command::single(Action::Stream(Box::pin(stream.map(f))))
+ }
+
/// Creates a [`Command`] that performs the actions of all the given
/// commands.
///
@@ -106,3 +119,23 @@ impl<T> fmt::Debug for Command<T> {
command.fmt(f)
}
}
+
+/// Creates a [`Command`] that produces the `Message`s published from a [`Future`]
+/// to an [`mpsc::Sender`] with the given bounds.
+pub fn channel<Fut, Message>(
+ size: usize,
+ f: impl FnOnce(mpsc::Sender<Message>) -> Fut + MaybeSend + 'static,
+) -> Command<Message>
+where
+ Fut: Future<Output = ()> + MaybeSend + 'static,
+ Message: 'static + MaybeSend,
+{
+ use futures::future;
+ use futures::stream::{self, StreamExt};
+
+ let (sender, receiver) = mpsc::channel(size);
+
+ let runner = stream::once(f(sender)).filter_map(|_| future::ready(None));
+
+ Command::single(Action::Stream(Box::pin(stream::select(receiver, runner))))
+}