use crate::core::widget; use crate::futures::futures::channel::mpsc; use crate::futures::futures::channel::oneshot; use crate::futures::futures::future::{self, FutureExt}; use crate::futures::futures::never::Never; use crate::futures::futures::stream::{self, Stream, StreamExt}; use crate::futures::{boxed_stream, BoxStream, MaybeSend}; use crate::Action; use std::future::Future; /// A set of concurrent actions to be performed by the iced runtime. /// /// A [`Task`] _may_ produce a bunch of values of type `T`. #[allow(missing_debug_implementations)] pub struct Task(Option>>); impl Task { /// Creates a [`Task`] that does nothing. pub fn none() -> Self { Self(None) } /// Creates a new [`Task`] that instantly produces the given value. pub fn done(value: T) -> Self where T: MaybeSend + 'static, { Self::future(future::ready(value)) } /// Creates a new [`Task`] that runs the given [`Future`] and produces /// its output. pub fn future(future: impl Future + MaybeSend + 'static) -> Self where T: 'static, { Self::stream(stream::once(future)) } /// Creates a new [`Task`] that runs the given [`Stream`] and produces /// each of its items. pub fn stream(stream: impl Stream + MaybeSend + 'static) -> Self where T: 'static, { Self(Some(boxed_stream(stream.map(Action::Output)))) } /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces /// its output. pub fn widget(operation: impl widget::Operation + 'static) -> Task where T: Send + 'static, { Self::channel(move |sender| { let operation = widget::operation::map(Box::new(operation), move |value| { let _ = sender.clone().try_send(value); }); Action::Widget(Box::new(operation)) }) } /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and /// produces the value fed to the [`oneshot::Sender`]. pub fn oneshot(f: impl FnOnce(oneshot::Sender) -> Action) -> Task where T: MaybeSend + 'static, { let (sender, receiver) = oneshot::channel(); let action = f(sender); Self(Some(boxed_stream( stream::once(async move { action }).chain( receiver.into_stream().filter_map(|result| async move { Some(Action::Output(result.ok()?)) }), ), ))) } /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and /// produces the values fed to the [`mpsc::Sender`]. pub fn channel(f: impl FnOnce(mpsc::Sender) -> Action) -> Task where T: MaybeSend + 'static, { let (sender, receiver) = mpsc::channel(1); let action = f(sender); Self(Some(boxed_stream( stream::once(async move { action }) .chain(receiver.map(|result| Action::Output(result))), ))) } /// Creates a new [`Task`] that executes the given [`Action`] and produces no output. pub fn effect(action: impl Into>) -> Self { let action = action.into(); Self(Some(boxed_stream(stream::once(async move { action.output().expect_err("no output") })))) } /// Maps the output of a [`Task`] with the given closure. pub fn map( self, mut f: impl FnMut(T) -> O + MaybeSend + 'static, ) -> Task where T: MaybeSend + 'static, O: MaybeSend + 'static, { self.then(move |output| Task::done(f(output))) } /// Performs a new [`Task`] for every output of the current [`Task`] using the /// given closure. /// /// This is the monadic interface of [`Task`]—analogous to [`Future`] and /// [`Stream`]. pub fn then( self, mut f: impl FnMut(T) -> Task + MaybeSend + 'static, ) -> Task where T: MaybeSend + 'static, O: MaybeSend + 'static, { Task(match self.0 { None => None, Some(stream) => { Some(boxed_stream(stream.flat_map(move |action| { match action.output() { Ok(output) => f(output) .0 .unwrap_or_else(|| boxed_stream(stream::empty())), Err(action) => { boxed_stream(stream::once(async move { action })) } } }))) } }) } /// Chains a new [`Task`] to be performed once the current one finishes completely. pub fn chain(self, task: Self) -> Self where T: 'static, { match self.0 { None => task, Some(first) => match task.0 { None => Task::none(), Some(second) => Task(Some(boxed_stream(first.chain(second)))), }, } } /// Creates a [`Task`] that runs the given [`Future`] to completion. pub fn perform( future: impl Future + MaybeSend + 'static, f: impl Fn(A) -> T + MaybeSend + 'static, ) -> Self where T: MaybeSend + 'static, A: MaybeSend + 'static, { Self::future(future.map(f)) } /// Creates a [`Task`] that runs the given [`Stream`] to completion. pub fn run( stream: impl Stream + MaybeSend + 'static, f: impl Fn(A) -> T + 'static + MaybeSend, ) -> Self where T: 'static, { Self::stream(stream.map(f)) } /// Combines the given tasks and produces a single [`Task`] that will run all of them /// in parallel. pub fn batch(tasks: impl IntoIterator) -> Self where T: 'static, { Self(Some(boxed_stream(stream::select_all( tasks.into_iter().filter_map(|task| task.0), )))) } /// Returns the underlying [`Stream`] of the [`Task`]. pub fn into_stream(self) -> Option>> { self.0 } } impl From<()> for Task where T: MaybeSend + 'static, { fn from(_value: ()) -> Self { Self::none() } }