From 4ce2a207a6c2f28345dc86804a12f2faab3d07ab Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Thu, 11 Jul 2024 04:08:40 +0200 Subject: Introduce `stream::try_channel` helper --- futures/src/stream.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'futures/src/stream.rs') diff --git a/futures/src/stream.rs b/futures/src/stream.rs index 06f9230d..af2f8c99 100644 --- a/futures/src/stream.rs +++ b/futures/src/stream.rs @@ -23,3 +23,24 @@ where stream::select(receiver, runner) } + +/// Creates a new [`Stream`] that produces the items sent from a [`Future`] +/// that can fail to the [`mpsc::Sender`] provided to the closure. +pub fn try_channel( + size: usize, + f: impl FnOnce(mpsc::Sender) -> F, +) -> impl Stream> +where + F: Future>, +{ + let (sender, receiver) = mpsc::channel(size); + + let runner = stream::once(f(sender)).filter_map(|result| async { + match result { + Ok(()) => None, + Err(error) => Some(Err(error)), + } + }); + + stream::select(receiver.map(Ok), runner) +} -- cgit