//! Create asynchronous streams of data. use futures::channel::mpsc; use futures::stream::{self, Stream, StreamExt}; use std::future::Future; /// Creates a new [`Stream`] that produces the items sent from a [`Future`] /// to the [`mpsc::Sender`] provided to the closure. /// /// This is a more ergonomic [`stream::unfold`], which allows you to go /// from the "world of futures" to the "world of streams" by simply looping /// and publishing to an async channel from inside a [`Future`]. pub fn channel( size: usize, f: impl FnOnce(mpsc::Sender) -> F, ) -> impl Stream where F: Future, { let (sender, receiver) = mpsc::channel(size); let runner = stream::once(f(sender)).filter_map(|_| async { None }); stream::select(receiver, runner) }