diff options
Diffstat (limited to 'futures/src/runtime.rs')
| -rw-r--r-- | futures/src/runtime.rs | 30 | 
1 files changed, 27 insertions, 3 deletions
| diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs index 2241a494..cac7b7e1 100644 --- a/futures/src/runtime.rs +++ b/futures/src/runtime.rs @@ -1,7 +1,7 @@  //! Run commands and keep track of subscriptions.  use crate::core::event::{self, Event};  use crate::subscription; -use crate::{BoxFuture, Executor, MaybeSend}; +use crate::{BoxFuture, BoxStream, Executor, MaybeSend};  use futures::{channel::mpsc, Sink};  use std::marker::PhantomData; @@ -9,9 +9,9 @@ use std::marker::PhantomData;  /// A batteries-included runtime of commands and subscriptions.  ///  /// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any -/// [`Command`] or [`Subscription`] and get notified of the results! +/// `Command` or [`Subscription`] and get notified of the results!  /// -/// [`Command`]: crate::Command +/// [`Subscription`]: crate::Subscription  #[derive(Debug)]  pub struct Runtime<Executor, Sender, Message> {      executor: Executor, @@ -69,12 +69,36 @@ where          self.executor.spawn(future);      } +    /// Runs a [`Stream`] in the [`Runtime`] until completion. +    /// +    /// The resulting `Message`s will be forwarded to the `Sender` of the +    /// [`Runtime`]. +    /// +    /// [`Stream`]: BoxStream +    pub fn run(&mut self, stream: BoxStream<Message>) { +        use futures::{FutureExt, StreamExt}; + +        let sender = self.sender.clone(); +        let future = +            stream.map(Ok).forward(sender).map(|result| match result { +                Ok(()) => (), +                Err(error) => { +                    log::warn!( +                        "Stream could not run until completion: {error}" +                    ); +                } +            }); + +        self.executor.spawn(future); +    } +      /// Tracks a [`Subscription`] in the [`Runtime`].      ///      /// It will spawn new streams or close old ones as necessary! See      /// [`Tracker::update`] to learn more about this!      ///      /// [`Tracker::update`]: subscription::Tracker::update +    /// [`Subscription`]: crate::Subscription      pub fn track(          &mut self,          recipes: impl IntoIterator< | 
