summaryrefslogtreecommitdiffstats
path: root/futures
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2022-05-27 19:43:50 +0200
committerLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2022-05-27 19:54:58 +0200
commitecd00cf02b23ccafd4427f498507621ebb017be7 (patch)
tree1988f2ec3b0b80569407f70050ba610579ae924d /futures
parent5de337f214530faab1d5fe47784afd7006c3f7f0 (diff)
downloadiced-ecd00cf02b23ccafd4427f498507621ebb017be7.tar.gz
iced-ecd00cf02b23ccafd4427f498507621ebb017be7.tar.bz2
iced-ecd00cf02b23ccafd4427f498507621ebb017be7.zip
Fix `Subscription` cancelation when never awaiting
`StreamExt::forward` will keep polling a ready `Stream` in a loop. If the `Stream` is always ready, the `poll` method of `Forward` effectively blocks (see https://github.com/rust-lang/futures-rs/issues/2552). The fix consists in manually implementing a simpler version of `Forward`.
Diffstat (limited to 'futures')
-rw-r--r--futures/src/subscription/tracker.rs33
1 files changed, 23 insertions, 10 deletions
diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs
index 421fb917..2cf98284 100644
--- a/futures/src/subscription/tracker.rs
+++ b/futures/src/subscription/tracker.rs
@@ -1,6 +1,9 @@
use crate::{BoxFuture, MaybeSend, Subscription};
-use futures::{channel::mpsc, sink::Sink};
+use futures::{
+ channel::mpsc,
+ sink::{Sink, SinkExt},
+};
use std::{collections::HashMap, marker::PhantomData};
/// A registry of subscription streams.
@@ -64,7 +67,7 @@ where
+ MaybeSend
+ Clone,
{
- use futures::{future::FutureExt, stream::StreamExt};
+ use futures::stream::StreamExt;
let mut futures: Vec<BoxFuture<()>> = Vec::new();
@@ -85,19 +88,29 @@ where
continue;
}
- let (cancel, cancelled) = futures::channel::oneshot::channel();
+ let (cancel, mut canceled) = futures::channel::oneshot::channel();
// TODO: Use bus if/when it supports async
let (event_sender, event_receiver) =
futures::channel::mpsc::channel(100);
- let stream = recipe.stream(event_receiver.boxed());
-
- let future = futures::future::select(
- cancelled,
- stream.map(Ok).forward(receiver.clone()),
- )
- .map(|_| ());
+ let mut receiver = receiver.clone();
+ let mut stream = recipe.stream(event_receiver.boxed());
+
+ let future = async move {
+ loop {
+ let select =
+ futures::future::select(&mut canceled, stream.next());
+
+ match select.await {
+ futures::future::Either::Left(_)
+ | futures::future::Either::Right((None, _)) => break,
+ futures::future::Either::Right((Some(message), _)) => {
+ let _ = receiver.send(message).await;
+ }
+ }
+ }
+ };
let _ = self.subscriptions.insert(
id,