diff options
Diffstat (limited to 'futures/src/runtime.rs')
-rw-r--r-- | futures/src/runtime.rs | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/futures/src/runtime.rs b/futures/src/runtime.rs new file mode 100644 index 00000000..9fd9899a --- /dev/null +++ b/futures/src/runtime.rs @@ -0,0 +1,119 @@ +//! Run commands and keep track of subscriptions. +use crate::{subscription, Command, Executor, Subscription}; + +use futures::Sink; +use std::marker::PhantomData; + +/// A batteries-included runtime of commands and subscriptions. +/// +/// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any +/// [`Command`] or [`Subscription`] and get notified of the results! +/// +/// [`Runtime`]: struct.Runtime.html +/// [`Executor`]: executor/trait.Executor.html +/// [`Command`]: struct.Command.html +/// [`Subscription`]: subscription/struct.Subscription.html +#[derive(Debug)] +pub struct Runtime<Hasher, Event, Executor, Sender, Message> { + executor: Executor, + sender: Sender, + subscriptions: subscription::Tracker<Hasher, Event>, + _message: PhantomData<Message>, +} + +impl<Hasher, Event, Executor, Sender, Message> + Runtime<Hasher, Event, Executor, Sender, Message> +where + Hasher: std::hash::Hasher + Default, + Event: Send + Clone + 'static, + Executor: self::Executor, + Sender: Sink<Message, Error = core::convert::Infallible> + + Unpin + + Send + + Clone + + 'static, + Message: Send + 'static, +{ + /// Creates a new empty [`Runtime`]. + /// + /// You need to provide: + /// - an [`Executor`] to spawn futures + /// - a `Sender` implementing `Sink` to receive the results + /// + /// [`Runtime`]: struct.Runtime.html + pub fn new(executor: Executor, sender: Sender) -> Self { + Self { + executor, + sender, + subscriptions: subscription::Tracker::new(), + _message: PhantomData, + } + } + + /// Runs the given closure inside the [`Executor`] of the [`Runtime`]. + /// + /// See [`Executor::enter`] to learn more. + /// + /// [`Executor`]: executor/trait.Executor.html + /// [`Runtime`]: struct.Runtime.html + /// [`Executor::enter`]: executor/trait.Executor.html#method.enter + pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R { + self.executor.enter(f) + } + + /// Spawns a [`Command`] in the [`Runtime`]. + /// + /// The resulting `Message` will be forwarded to the `Sender` of the + /// [`Runtime`]. + /// + /// [`Command`]: struct.Command.html + /// [`Runtime`]: struct.Runtime.html + pub fn spawn(&mut self, command: Command<Message>) { + use futures::{FutureExt, SinkExt}; + + let futures = command.futures(); + + for future in futures { + let mut sender = self.sender.clone(); + + self.executor.spawn(future.then(|message| { + async move { + let _ = sender.send(message).await; + + () + } + })); + } + } + + /// Tracks a [`Subscription`] in the [`Runtime`]. + /// + /// It will spawn new streams or close old ones as necessary! See + /// [`Tracker::update`] to learn more about this! + /// + /// [`Subscription`]: subscription/struct.Subscription.html + /// [`Runtime`]: struct.Runtime.html + /// [`Tracker::update`]: subscription/struct.Tracker.html#method.update + pub fn track( + &mut self, + subscription: Subscription<Hasher, Event, Message>, + ) { + let futures = + self.subscriptions.update(subscription, self.sender.clone()); + + for future in futures { + self.executor.spawn(future); + } + } + + /// Broadcasts an event to all the subscriptions currently alive in the + /// [`Runtime`]. + /// + /// See [`Tracker::broadcast`] to learn more. + /// + /// [`Runtime`]: struct.Runtime.html + /// [`Tracker::broadcast`]: subscription/struct.Tracker.html#method.broadcast + pub fn broadcast(&mut self, event: Event) { + self.subscriptions.broadcast(event); + } +} |