summaryrefslogtreecommitdiffstats
path: root/native
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--native/src/subscription.rs164
-rw-r--r--native/src/subscription/events.rs35
2 files changed, 154 insertions, 45 deletions
diff --git a/native/src/subscription.rs b/native/src/subscription.rs
index 2950879c..63834654 100644
--- a/native/src/subscription.rs
+++ b/native/src/subscription.rs
@@ -1,8 +1,12 @@
//! Listen to external events in your application.
use crate::event::{self, Event};
use crate::Hasher;
+
+use iced_futures::futures::{self, Future, Stream};
use iced_futures::BoxStream;
+use std::hash::Hash;
+
/// A request to listen to external events.
///
/// Besides performing async actions on demand with [`Command`], most
@@ -29,20 +33,14 @@ pub type Tracker =
pub use iced_futures::subscription::Recipe;
-mod events;
-
-use events::Events;
-
/// Returns a [`Subscription`] to all the runtime events.
///
/// This subscription will notify your application of any [`Event`] that was
/// not captured by any widget.
pub fn events() -> Subscription<Event> {
- Subscription::from_recipe(Events {
- f: |event, status| match status {
- event::Status::Ignored => Some(event),
- event::Status::Captured => None,
- },
+ events_with(|event, status| match status {
+ event::Status::Ignored => Some(event),
+ event::Status::Captured => None,
})
}
@@ -60,5 +58,151 @@ pub fn events_with<Message>(
where
Message: 'static + Send,
{
- Subscription::from_recipe(Events { f })
+ Subscription::from_recipe(Runner {
+ id: f,
+ spawn: move |events| {
+ use futures::future;
+ use futures::stream::StreamExt;
+
+ events.filter_map(move |(event, status)| {
+ future::ready(f(event, status))
+ })
+ },
+ })
+}
+
+/// Returns a [`Subscription`] that will create and asynchronously run the
+/// given [`Stream`].
+///
+/// The `id` will be used to uniquely identify the [`Subscription`].
+pub fn run<I, S, Message>(id: I, stream: S) -> Subscription<Message>
+where
+ I: Hash + 'static,
+ S: Stream<Item = Message> + Send + 'static,
+ Message: 'static,
+{
+ Subscription::from_recipe(Runner {
+ id,
+ spawn: move |_| stream,
+ })
+}
+
+/// Returns a [`Subscription`] that will create and asynchronously run a
+/// [`Stream`] that will call the provided closure to produce every `Message`.
+///
+/// The `id` will be used to uniquely identify the [`Subscription`].
+///
+/// # Creating an asynchronous worker with bidirectional communication
+/// You can leverage this helper to create a [`Subscription`] that spawns
+/// an asynchronous worker in the background and establish a channel of
+/// communication with an `iced` application.
+///
+/// You can achieve this by creating an `mpsc` channel inside the closure
+/// and returning the `Sender` as a `Message` for the `Application`:
+///
+/// ```
+/// use iced_native::subscription::{self, Subscription};
+/// use iced_native::futures::channel::mpsc;
+///
+/// pub enum Event {
+/// Ready(mpsc::Sender<Input>),
+/// WorkFinished,
+/// // ...
+/// }
+///
+/// enum Input {
+/// DoSomeWork,
+/// // ...
+/// }
+///
+/// enum State {
+/// Starting,
+/// Ready(mpsc::Receiver<Input>),
+/// }
+///
+/// fn some_worker() -> Subscription<Event> {
+/// struct SomeWorker;
+///
+/// subscription::unfold(std::any::TypeId::of::<SomeWorker>(), State::Starting, |state| async move {
+/// match state {
+/// State::Starting => {
+/// // Create channel
+/// let (sender, receiver) = mpsc::channel(100);
+///
+/// (Some(Event::Ready(sender)), State::Ready(receiver))
+/// }
+/// State::Ready(mut receiver) => {
+/// use iced_native::futures::StreamExt;
+///
+/// // Read next input sent from `Application`
+/// let input = receiver.select_next_some().await;
+///
+/// match input {
+/// Input::DoSomeWork => {
+/// // Do some async work...
+///
+/// // Finally, we can optionally return a message to tell the
+/// // `Application` the work is done
+/// (Some(Event::WorkFinished), State::Ready(receiver))
+/// }
+/// }
+/// }
+/// }
+/// })
+/// }
+/// ```
+///
+/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket
+/// connection open.
+///
+/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.4/examples/websocket
+pub fn unfold<I, T, Fut, Message>(
+ id: I,
+ initial: T,
+ mut f: impl FnMut(T) -> Fut + Send + Sync + 'static,
+) -> Subscription<Message>
+where
+ I: Hash + 'static,
+ T: Send + 'static,
+ Fut: Future<Output = (Option<Message>, T)> + Send + 'static,
+ Message: 'static + Send,
+{
+ use futures::future::{self, FutureExt};
+ use futures::stream::StreamExt;
+
+ run(
+ id,
+ futures::stream::unfold(initial, move |state| f(state).map(Some))
+ .filter_map(future::ready),
+ )
+}
+
+struct Runner<I, F, S, Message>
+where
+ F: FnOnce(EventStream) -> S,
+ S: Stream<Item = Message>,
+{
+ id: I,
+ spawn: F,
+}
+
+impl<I, S, F, Message> Recipe<Hasher, (Event, event::Status)>
+ for Runner<I, F, S, Message>
+where
+ I: Hash + 'static,
+ F: FnOnce(EventStream) -> S,
+ S: Stream<Item = Message> + Send + 'static,
+{
+ type Output = Message;
+
+ fn hash(&self, state: &mut Hasher) {
+ std::any::TypeId::of::<I>().hash(state);
+ self.id.hash(state);
+ }
+
+ fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
+ use futures::stream::StreamExt;
+
+ (self.spawn)(input).boxed()
+ }
}
diff --git a/native/src/subscription/events.rs b/native/src/subscription/events.rs
deleted file mode 100644
index ca143bb3..00000000
--- a/native/src/subscription/events.rs
+++ /dev/null
@@ -1,35 +0,0 @@
-use crate::event::{self, Event};
-use crate::subscription::{EventStream, Recipe};
-use crate::Hasher;
-use iced_futures::futures::future;
-use iced_futures::futures::StreamExt;
-use iced_futures::BoxStream;
-
-pub struct Events<Message> {
- pub(super) f: fn(Event, event::Status) -> Option<Message>,
-}
-
-impl<Message> Recipe<Hasher, (Event, event::Status)> for Events<Message>
-where
- Message: 'static + Send,
-{
- type Output = Message;
-
- fn hash(&self, state: &mut Hasher) {
- use std::hash::Hash;
-
- struct Marker;
- std::any::TypeId::of::<Marker>().hash(state);
- self.f.hash(state);
- }
-
- fn stream(
- self: Box<Self>,
- event_stream: EventStream,
- ) -> BoxStream<Self::Output> {
- let stream = event_stream.filter_map(move |(event, status)| {
- future::ready((self.f)(event, status))
- });
- iced_futures::boxed_stream(stream)
- }
-}