summaryrefslogtreecommitdiffstats
path: root/core/src/runtime.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/runtime.rs')
-rw-r--r--core/src/runtime.rs74
1 files changed, 74 insertions, 0 deletions
diff --git a/core/src/runtime.rs b/core/src/runtime.rs
new file mode 100644
index 00000000..31234d11
--- /dev/null
+++ b/core/src/runtime.rs
@@ -0,0 +1,74 @@
+mod executor;
+
+pub use executor::Executor;
+
+use crate::{subscription, Command, Subscription};
+
+use futures::Sink;
+use std::marker::PhantomData;
+
+#[derive(Debug)]
+pub struct Runtime<Hasher, Event, Executor, Receiver, Message> {
+ executor: Executor,
+ subscriptions: subscription::Tracker<Hasher, Event>,
+ receiver: Receiver,
+ _message: PhantomData<Message>,
+}
+
+impl<Hasher, Event, Executor, Receiver, Message>
+ Runtime<Hasher, Event, Executor, Receiver, Message>
+where
+ Hasher: std::hash::Hasher + Default,
+ Event: Send + Clone + 'static,
+ Executor: self::Executor,
+ Receiver: Sink<Message, Error = core::convert::Infallible>
+ + Unpin
+ + Send
+ + Clone
+ + 'static,
+ Message: Send + 'static,
+{
+ pub fn new(receiver: Receiver) -> Self {
+ Self {
+ executor: Executor::new(),
+ subscriptions: subscription::Tracker::new(),
+ receiver,
+ _message: PhantomData,
+ }
+ }
+
+ pub fn spawn(&mut self, command: Command<Message>) {
+ use futures::{FutureExt, SinkExt};
+
+ let futures = command.futures();
+
+ for future in futures {
+ let mut receiver = self.receiver.clone();
+
+ self.executor.spawn(future.then(|message| {
+ async move {
+ let _ = receiver.send(message).await;
+
+ ()
+ }
+ }));
+ }
+ }
+
+ pub fn track(
+ &mut self,
+ subscription: Subscription<Hasher, Event, Message>,
+ ) {
+ let futures = self
+ .subscriptions
+ .update(subscription, self.receiver.clone());
+
+ for future in futures {
+ self.executor.spawn(future);
+ }
+ }
+
+ pub fn broadcast(&mut self, event: Event) {
+ self.subscriptions.broadcast(event);
+ }
+}