diff options
Diffstat (limited to 'futures/src')
| -rw-r--r-- | futures/src/subscription/tracker.rs | 33 | 
1 files changed, 23 insertions, 10 deletions
| diff --git a/futures/src/subscription/tracker.rs b/futures/src/subscription/tracker.rs index 421fb917..2cf98284 100644 --- a/futures/src/subscription/tracker.rs +++ b/futures/src/subscription/tracker.rs @@ -1,6 +1,9 @@  use crate::{BoxFuture, MaybeSend, Subscription}; -use futures::{channel::mpsc, sink::Sink}; +use futures::{ +    channel::mpsc, +    sink::{Sink, SinkExt}, +};  use std::{collections::HashMap, marker::PhantomData};  /// A registry of subscription streams. @@ -64,7 +67,7 @@ where              + MaybeSend              + Clone,      { -        use futures::{future::FutureExt, stream::StreamExt}; +        use futures::stream::StreamExt;          let mut futures: Vec<BoxFuture<()>> = Vec::new(); @@ -85,19 +88,29 @@ where                  continue;              } -            let (cancel, cancelled) = futures::channel::oneshot::channel(); +            let (cancel, mut canceled) = futures::channel::oneshot::channel();              // TODO: Use bus if/when it supports async              let (event_sender, event_receiver) =                  futures::channel::mpsc::channel(100); -            let stream = recipe.stream(event_receiver.boxed()); - -            let future = futures::future::select( -                cancelled, -                stream.map(Ok).forward(receiver.clone()), -            ) -            .map(|_| ()); +            let mut receiver = receiver.clone(); +            let mut stream = recipe.stream(event_receiver.boxed()); + +            let future = async move { +                loop { +                    let select = +                        futures::future::select(&mut canceled, stream.next()); + +                    match select.await { +                        futures::future::Either::Left(_) +                        | futures::future::Either::Right((None, _)) => break, +                        futures::future::Either::Right((Some(message), _)) => { +                            let _ = receiver.send(message).await; +                        } +                    } +                } +            };              let _ = self.subscriptions.insert(                  id, | 
