//! A `tokio` backend. use futures::Future; /// A `tokio` executor. pub type Executor = tokio::runtime::Runtime; impl crate::Executor for Executor { fn new() -> Result { tokio::runtime::Runtime::new() } #[allow(clippy::let_underscore_future)] fn spawn(&self, future: impl Future + Send + 'static) { let _ = tokio::runtime::Runtime::spawn(self, future); } fn enter(&self, f: impl FnOnce() -> R) -> R { let _guard = tokio::runtime::Runtime::enter(self); f() } } pub mod time { //! Listen and react to time. use crate::core::time::{Duration, Instant}; use crate::stream; use crate::subscription::Subscription; use crate::MaybeSend; use futures::SinkExt; use std::future::Future; /// Returns a [`Subscription`] that produces messages at a set interval. /// /// The first message is produced after a `duration`, and then continues to /// produce more messages every `duration` after that. pub fn every(duration: Duration) -> Subscription { Subscription::run_with(duration, |duration| { use futures::stream::StreamExt; let start = tokio::time::Instant::now() + *duration; let mut interval = tokio::time::interval_at(start, *duration); interval.set_missed_tick_behavior( tokio::time::MissedTickBehavior::Skip, ); let stream = { futures::stream::unfold(interval, |mut interval| async move { Some((interval.tick().await, interval)) }) }; stream.map(tokio::time::Instant::into_std).boxed() }) } /// Returns a [`Subscription`] that runs the given async function at a /// set interval; producing the result of the function as output. pub fn repeat(f: fn() -> F, interval: Duration) -> Subscription where F: Future + MaybeSend + 'static, T: MaybeSend + 'static, { Subscription::run_with((f, interval), |(f, interval)| { let f = *f; let interval = *interval; stream::channel(1, move |mut output| async move { loop { let _ = output.send(f().await).await; tokio::time::sleep(interval).await; } }) }) } }