diff options
Diffstat (limited to 'futures/src/runtime.rs')
-rw-r--r-- | futures/src/runtime.rs | 30 |
1 files changed, 27 insertions, 3 deletions
diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index 2241a494..cac7b7e1 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -1,7 +1,7 @@ //! Run commands and keep track of subscriptions. use crate::core::event::{self, Event}; use crate::subscription; -use crate::{BoxFuture, Executor, MaybeSend}; +use crate::{BoxFuture, BoxStream, Executor, MaybeSend}; use futures::{channel::mpsc, Sink}; use std::marker::PhantomData; @@ -9,9 +9,9 @@ use std::marker::PhantomData; /// A batteries-included runtime of commands and subscriptions. /// /// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any -/// [`Command`] or [`Subscription`] and get notified of the results! +/// `Command` or [`Subscription`] and get notified of the results! /// -/// [`Command`]: crate::Command +/// [`Subscription`]: crate::Subscription #[derive(Debug)] pub struct Runtime<Executor, Sender, Message> { executor: Executor, @@ -69,12 +69,36 @@ where self.executor.spawn(future); } + /// Runs a [`Stream`] in the [`Runtime`] until completion. + /// + /// The resulting `Message`s will be forwarded to the `Sender` of the + /// [`Runtime`]. + /// + /// [`Stream`]: BoxStream + pub fn run(&mut self, stream: BoxStream<Message>) { + use futures::{FutureExt, StreamExt}; + + let sender = self.sender.clone(); + let future = + stream.map(Ok).forward(sender).map(|result| match result { + Ok(()) => (), + Err(error) => { + log::warn!( + "Stream could not run until completion: {error}" + ); + } + }); + + self.executor.spawn(future); + } + /// Tracks a [`Subscription`] in the [`Runtime`]. /// /// It will spawn new streams or close old ones as necessary! See /// [`Tracker::update`] to learn more about this! /// /// [`Tracker::update`]: subscription::Tracker::update + /// [`Subscription`]: crate::Subscription pub fn track( &mut self, recipes: impl IntoIterator< |