diff options
Diffstat (limited to 'core/src')
| -rw-r--r-- | core/src/lib.rs | 6 | ||||
| -rw-r--r-- | core/src/runtime.rs | 74 | ||||
| -rw-r--r-- | core/src/runtime/executor.rs | 11 | ||||
| -rw-r--r-- | core/src/subscription/tracker.rs | 8 | 
4 files changed, 95 insertions, 4 deletions
| diff --git a/core/src/lib.rs b/core/src/lib.rs index 6f13c310..760acefe 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -44,3 +44,9 @@ pub mod subscription;  #[cfg(feature = "subscription")]  pub use subscription::Subscription; + +#[cfg(feature = "runtime")] +mod runtime; + +#[cfg(feature = "runtime")] +pub use runtime::Runtime; diff --git a/core/src/runtime.rs b/core/src/runtime.rs new file mode 100644 index 00000000..31234d11 --- /dev/null +++ b/core/src/runtime.rs @@ -0,0 +1,74 @@ +mod executor; + +pub use executor::Executor; + +use crate::{subscription, Command, Subscription}; + +use futures::Sink; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Runtime<Hasher, Event, Executor, Receiver, Message> { +    executor: Executor, +    subscriptions: subscription::Tracker<Hasher, Event>, +    receiver: Receiver, +    _message: PhantomData<Message>, +} + +impl<Hasher, Event, Executor, Receiver, Message> +    Runtime<Hasher, Event, Executor, Receiver, Message> +where +    Hasher: std::hash::Hasher + Default, +    Event: Send + Clone + 'static, +    Executor: self::Executor, +    Receiver: Sink<Message, Error = core::convert::Infallible> +        + Unpin +        + Send +        + Clone +        + 'static, +    Message: Send + 'static, +{ +    pub fn new(receiver: Receiver) -> Self { +        Self { +            executor: Executor::new(), +            subscriptions: subscription::Tracker::new(), +            receiver, +            _message: PhantomData, +        } +    } + +    pub fn spawn(&mut self, command: Command<Message>) { +        use futures::{FutureExt, SinkExt}; + +        let futures = command.futures(); + +        for future in futures { +            let mut receiver = self.receiver.clone(); + +            self.executor.spawn(future.then(|message| { +                async move { +                    let _ = receiver.send(message).await; + +                    () +                } +            })); +        } +    } + +    pub fn track( +        &mut self, +        subscription: Subscription<Hasher, Event, Message>, +    ) { +        let futures = self +            .subscriptions +            .update(subscription, self.receiver.clone()); + +        for future in futures { +            self.executor.spawn(future); +        } +    } + +    pub fn broadcast(&mut self, event: Event) { +        self.subscriptions.broadcast(event); +    } +} diff --git a/core/src/runtime/executor.rs b/core/src/runtime/executor.rs new file mode 100644 index 00000000..d171c6d5 --- /dev/null +++ b/core/src/runtime/executor.rs @@ -0,0 +1,11 @@ +use futures::Future; + +pub trait Executor { +    fn new() -> Self; + +    fn spawn(&self, future: impl Future<Output = ()> + Send + 'static); + +    fn enter<R>(&self, f: impl FnOnce() -> R) -> R { +        f() +    } +} diff --git a/core/src/subscription/tracker.rs b/core/src/subscription/tracker.rs index 826f60c0..a942b619 100644 --- a/core/src/subscription/tracker.rs +++ b/core/src/subscription/tracker.rs @@ -28,14 +28,14 @@ where          }      } -    pub fn update<Message, S>( +    pub fn update<Message, Receiver>(          &mut self,          subscription: Subscription<Hasher, Event, Message>, -        sink: S, +        receiver: Receiver,      ) -> Vec<BoxFuture<'static, ()>>      where          Message: 'static + Send, -        S: 'static +        Receiver: 'static              + Sink<Message, Error = core::convert::Infallible>              + Unpin              + Send @@ -72,7 +72,7 @@ where              let future = futures::future::select(                  cancelled, -                stream.map(Ok).forward(sink.clone()), +                stream.map(Ok).forward(receiver.clone()),              )              .map(|_| ()); | 
