diff options
| author | 2023-04-11 07:46:54 +0200 | |
|---|---|---|
| committer | 2023-04-11 07:46:54 +0200 | |
| commit | ae7e6b3d481e420acf981fabbeff9745d4b5347d (patch) | |
| tree | 2c336d3a94815dac5a3a9a225edbe336ccebb5f3 /native | |
| parent | ff24f9040c822fa7706087091e92ebee1e5211c5 (diff) | |
| download | iced-ae7e6b3d481e420acf981fabbeff9745d4b5347d.tar.gz iced-ae7e6b3d481e420acf981fabbeff9745d4b5347d.tar.bz2 iced-ae7e6b3d481e420acf981fabbeff9745d4b5347d.zip | |
Implement `subscription::channel` and simplify `unfold`
Diffstat (limited to '')
| -rw-r--r-- | native/src/subscription.rs | 91 | 
1 files changed, 63 insertions, 28 deletions
| diff --git a/native/src/subscription.rs b/native/src/subscription.rs index 16e78e82..0ff5e320 100644 --- a/native/src/subscription.rs +++ b/native/src/subscription.rs @@ -3,6 +3,8 @@ use crate::event::{self, Event};  use crate::window;  use crate::Hasher; +use iced_futures::futures::channel::mpsc; +use iced_futures::futures::never::Never;  use iced_futures::futures::{self, Future, Stream};  use iced_futures::{BoxStream, MaybeSend}; @@ -133,6 +135,27 @@ where  /// [`Stream`] that will call the provided closure to produce every `Message`.  ///  /// The `id` will be used to uniquely identify the [`Subscription`]. +pub fn unfold<I, T, Fut, Message>( +    id: I, +    initial: T, +    mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, +) -> Subscription<Message> +where +    I: Hash + 'static, +    T: MaybeSend + 'static, +    Fut: Future<Output = (Message, T)> + MaybeSend + 'static, +    Message: 'static + MaybeSend, +{ +    use futures::future::FutureExt; + +    run_with_id( +        id, +        futures::stream::unfold(initial, move |state| f(state).map(Some)), +    ) +} + +/// Creates a [`Subscription`] that publishes the events sent from a [`Future`] +/// to an [`mpsc::Sender`] with the given bounds.  ///  /// # Creating an asynchronous worker with bidirectional communication  /// You can leverage this helper to create a [`Subscription`] that spawns @@ -145,6 +168,7 @@ where  /// ```  /// use iced_native::subscription::{self, Subscription};  /// use iced_native::futures::channel::mpsc; +/// use iced_native::futures::sink::SinkExt;  ///  /// pub enum Event {  ///     Ready(mpsc::Sender<Input>), @@ -165,27 +189,35 @@ where  /// fn some_worker() -> Subscription<Event> {  ///     struct SomeWorker;  /// -///     subscription::unfold(std::any::TypeId::of::<SomeWorker>(), State::Starting, |state| async move { -///         match state { -///             State::Starting => { -///                 // Create channel -///                 let (sender, receiver) = mpsc::channel(100); +///     subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move { +///         let mut state = State::Starting;  /// -///                 (Some(Event::Ready(sender)), State::Ready(receiver)) -///             } -///             State::Ready(mut receiver) => { -///                 use iced_native::futures::StreamExt; +///         loop { +///             match &mut state { +///                 State::Starting => { +///                     // Create channel +///                     let (sender, receiver) = mpsc::channel(100); +/// +///                     // Send the sender back to the application +///                     output.send(Event::Ready(sender)).await; +/// +///                     // We are ready to receive messages +///                     state = State::Ready(receiver); +///                 } +///                 State::Ready(receiver) => { +///                     use iced_native::futures::StreamExt;  /// -///                 // Read next input sent from `Application` -///                 let input = receiver.select_next_some().await; +///                     // Read next input sent from `Application` +///                     let input = receiver.select_next_some().await;  /// -///                 match input { -///                     Input::DoSomeWork => { -///                         // Do some async work... +///                     match input { +///                         Input::DoSomeWork => { +///                             // Do some async work...  /// -///                         // Finally, we can optionally return a message to tell the -///                         // `Application` the work is done -///                         (Some(Event::WorkFinished), State::Ready(receiver)) +///                             // Finally, we can optionally produce a message to tell the +///                             // `Application` the work is done +///                             output.send(Event::WorkFinished).await; +///                         }  ///                     }  ///                 }  ///             } @@ -198,25 +230,28 @@ where  /// connection open.  ///  /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.8/examples/websocket -pub fn unfold<I, T, Fut, Message>( +pub fn channel<I, Fut, Message>(      id: I, -    initial: T, -    mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static, +    size: usize, +    f: impl Fn(mpsc::Sender<Message>) -> Fut + MaybeSend + Sync + 'static,  ) -> Subscription<Message>  where      I: Hash + 'static, -    T: MaybeSend + 'static, -    Fut: Future<Output = (Option<Message>, T)> + MaybeSend + 'static, +    Fut: Future<Output = Never> + MaybeSend + 'static,      Message: 'static + MaybeSend,  { -    use futures::future::{self, FutureExt}; -    use futures::stream::StreamExt; +    use futures::stream::{self, StreamExt}; -    run_with_id( +    Subscription::from_recipe(Runner {          id, -        futures::stream::unfold(initial, move |state| f(state).map(Some)) -            .filter_map(future::ready), -    ) +        spawn: move |_| { +            let (sender, receiver) = mpsc::channel(size); + +            let runner = stream::once(f(sender)).map(|_| unreachable!()); + +            stream::select(receiver, runner) +        }, +    })  }  struct Runner<I, F, S, Message> | 
