From 8bc49cd88653309f5abe8a38d5a4af36fcfea933 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 5 Jul 2024 02:15:13 +0200 Subject: Hide `Subscription` internals .. and introduce `stream::channel` helper --- futures/src/stream.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 futures/src/stream.rs (limited to 'futures/src/stream.rs') diff --git a/futures/src/stream.rs b/futures/src/stream.rs new file mode 100644 index 00000000..2ec505f1 --- /dev/null +++ b/futures/src/stream.rs @@ -0,0 +1,26 @@ +//! Create asynchronous streams of data. +use futures::channel::mpsc; +use futures::never::Never; +use futures::stream::{self, Stream, StreamExt}; + +use std::future::Future; + +/// Creates a new [`Stream`] that produces the items sent from a [`Future`] +/// to the [`mpsc::Sender`] provided to the closure. +/// +/// This is a more ergonomic [`stream::unfold`], which allows you to go +/// from the "world of futures" to the "world of streams" by simply looping +/// and publishing to an async channel from inside a [`Future`]. +pub fn channel( + size: usize, + f: impl FnOnce(mpsc::Sender) -> F, +) -> impl Stream +where + F: Future, +{ + let (sender, receiver) = mpsc::channel(size); + + let runner = stream::once(f(sender)).map(|_| unreachable!()); + + stream::select(receiver, runner) +} -- cgit