From 0f9934b1a70cdadb9131b68b6dfb76083f014636 Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Fri, 21 Feb 2025 01:48:09 +0100 Subject: Leverage new `AsyncFn` traits in `stream` module --- futures/src/stream.rs | 18 ++++++------------ futures/src/subscription.rs | 2 +- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/futures/src/stream.rs b/futures/src/stream.rs index 72e1f04b..ee9c0c14 100644 --- a/futures/src/stream.rs +++ b/futures/src/stream.rs @@ -8,13 +8,10 @@ use futures::stream::{self, Stream, StreamExt}; /// This is a more ergonomic [`stream::unfold`], which allows you to go /// from the "world of futures" to the "world of streams" by simply looping /// and publishing to an async channel from inside a [`Future`]. -pub fn channel( +pub fn channel( size: usize, - f: impl FnOnce(mpsc::Sender) -> F, -) -> impl Stream -where - F: Future, -{ + f: impl AsyncFnOnce(mpsc::Sender), +) -> impl Stream { let (sender, receiver) = mpsc::channel(size); let runner = stream::once(f(sender)).filter_map(|_| async { None }); @@ -24,13 +21,10 @@ where /// Creates a new [`Stream`] that produces the items sent from a [`Future`] /// that can fail to the [`mpsc::Sender`] provided to the closure. -pub fn try_channel( +pub fn try_channel( size: usize, - f: impl FnOnce(mpsc::Sender) -> F, -) -> impl Stream> -where - F: Future>, -{ + f: impl AsyncFnOnce(mpsc::Sender) -> Result<(), E>, +) -> impl Stream> { let (sender, receiver) = mpsc::channel(size); let runner = stream::once(f(sender)).filter_map(|result| async { diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index 3577d19f..f799d5f8 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -161,7 +161,7 @@ impl Subscription { /// } /// /// fn some_worker() -> impl Stream { - /// stream::channel(100, |mut output| async move { + /// stream::channel(100, async |mut output| { /// // Create channel /// let (sender, mut receiver) = mpsc::channel(100); /// -- cgit