//! Run commands and keep track of subscriptions. use crate::core::MaybeSend; use crate::subscription; use crate::{BoxFuture, BoxStream, Executor}; use futures::{channel::mpsc, Sink}; use std::marker::PhantomData; /// A batteries-included runtime of commands and subscriptions. /// /// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any /// `Command` or [`Subscription`] and get notified of the results! /// /// [`Subscription`]: crate::Subscription #[derive(Debug)] pub struct Runtime { executor: Executor, sender: Sender, subscriptions: subscription::Tracker, _message: PhantomData, } impl Runtime where Executor: self::Executor, Sender: Sink + Unpin + MaybeSend + Clone + 'static, Message: MaybeSend + 'static, { /// Creates a new empty [`Runtime`]. /// /// You need to provide: /// - an [`Executor`] to spawn futures /// - a `Sender` implementing `Sink` to receive the results pub fn new(executor: Executor, sender: Sender) -> Self { Self { executor, sender, subscriptions: subscription::Tracker::new(), _message: PhantomData, } } /// Runs the given closure inside the [`Executor`] of the [`Runtime`]. /// /// See [`Executor::enter`] to learn more. pub fn enter(&self, f: impl FnOnce() -> R) -> R { self.executor.enter(f) } /// Spawns a [`Future`] in the [`Runtime`]. /// /// The resulting `Message` will be forwarded to the `Sender` of the /// [`Runtime`]. /// /// [`Future`]: BoxFuture pub fn spawn(&mut self, future: BoxFuture) { use futures::{FutureExt, SinkExt}; let mut sender = self.sender.clone(); let future = future.then(|message| async move { let _ = sender.send(message).await; }); self.executor.spawn(future); } /// Runs a [`Stream`] in the [`Runtime`] until completion. /// /// The resulting `Message`s will be forwarded to the `Sender` of the /// [`Runtime`]. /// /// [`Stream`]: BoxStream pub fn run(&mut self, stream: BoxStream) { use futures::{FutureExt, StreamExt}; let sender = self.sender.clone(); let future = stream.map(Ok).forward(sender).map(|result| match result { Ok(()) => (), Err(error) => { log::warn!( "Stream could not run until completion: {error}" ); } }); self.executor.spawn(future); } /// Tracks a [`Subscription`] in the [`Runtime`]. /// /// It will spawn new streams or close old ones as necessary! See /// [`Tracker::update`] to learn more about this! /// /// [`Tracker::update`]: subscription::Tracker::update /// [`Subscription`]: crate::Subscription pub fn track( &mut self, recipes: impl IntoIterator< Item = Box>, >, ) { let Runtime { executor, subscriptions, sender, .. } = self; let futures = executor.enter(|| { subscriptions.update(recipes.into_iter(), sender.clone()) }); for future in futures { executor.spawn(future); } } /// Broadcasts an event to all the subscriptions currently alive in the /// [`Runtime`]. /// /// See [`Tracker::broadcast`] to learn more. /// /// [`Tracker::broadcast`]: subscription::Tracker::broadcast pub fn broadcast(&mut self, event: subscription::Event) { self.subscriptions.broadcast(event); } }