summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml1
-rw-r--r--examples/clock/src/main.rs6
-rw-r--r--examples/download_progress/src/download.rs142
-rw-r--r--examples/websocket/Cargo.toml22
-rw-r--r--examples/websocket/README.md17
-rw-r--r--examples/websocket/src/echo.rs146
-rw-r--r--examples/websocket/src/echo/server.rs57
-rw-r--r--examples/websocket/src/main.rs162
-rw-r--r--native/src/subscription.rs164
-rw-r--r--native/src/subscription/events.rs35
10 files changed, 615 insertions, 137 deletions
diff --git a/Cargo.toml b/Cargo.toml
index bb50888a..e553c78f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -88,6 +88,7 @@ members = [
"examples/tooltip",
"examples/tour",
"examples/url_handler",
+ "examples/websocket",
]
[dependencies]
diff --git a/examples/clock/src/main.rs b/examples/clock/src/main.rs
index de9e879a..325ccc1a 100644
--- a/examples/clock/src/main.rs
+++ b/examples/clock/src/main.rs
@@ -66,16 +66,12 @@ impl Application for Clock {
}
fn view(&mut self) -> Element<Message> {
- let canvas = Canvas::new(self)
- .width(Length::Units(400))
- .height(Length::Units(400));
+ let canvas = Canvas::new(self).width(Length::Fill).height(Length::Fill);
Container::new(canvas)
.width(Length::Fill)
.height(Length::Fill)
.padding(20)
- .center_x()
- .center_y()
.into()
}
}
diff --git a/examples/download_progress/src/download.rs b/examples/download_progress/src/download.rs
index 08805f13..7db1206b 100644
--- a/examples/download_progress/src/download.rs
+++ b/examples/download_progress/src/download.rs
@@ -1,111 +1,79 @@
-use iced_futures::futures;
-use std::hash::{Hash, Hasher};
+use iced_native::subscription;
+
+use std::hash::Hash;
// Just a little utility function
-pub fn file<I: 'static + Hash + Copy + Send, T: ToString>(
+pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>(
id: I,
url: T,
) -> iced::Subscription<(I, Progress)> {
- iced::Subscription::from_recipe(Download {
- id,
- url: url.to_string(),
+ subscription::unfold(id, State::Ready(url.to_string()), move |state| {
+ download(id, state)
})
}
+#[derive(Debug, Hash, Clone)]
pub struct Download<I> {
id: I,
url: String,
}
-// Make sure iced can use our download stream
-impl<H, I, T> iced_native::subscription::Recipe<H, I> for Download<T>
-where
- T: 'static + Hash + Copy + Send,
- H: Hasher,
-{
- type Output = (T, Progress);
-
- fn hash(&self, state: &mut H) {
- struct Marker;
- std::any::TypeId::of::<Marker>().hash(state);
-
- self.id.hash(state);
- }
+async fn download<I: Copy>(
+ id: I,
+ state: State,
+) -> (Option<(I, Progress)>, State) {
+ match state {
+ State::Ready(url) => {
+ let response = reqwest::get(&url).await;
- fn stream(
- self: Box<Self>,
- _input: futures::stream::BoxStream<'static, I>,
- ) -> futures::stream::BoxStream<'static, Self::Output> {
- let id = self.id;
+ match response {
+ Ok(response) => {
+ if let Some(total) = response.content_length() {
+ (
+ Some((id, Progress::Started)),
+ State::Downloading {
+ response,
+ total,
+ downloaded: 0,
+ },
+ )
+ } else {
+ (Some((id, Progress::Errored)), State::Finished)
+ }
+ }
+ Err(_) => (Some((id, Progress::Errored)), State::Finished),
+ }
+ }
+ State::Downloading {
+ mut response,
+ total,
+ downloaded,
+ } => match response.chunk().await {
+ Ok(Some(chunk)) => {
+ let downloaded = downloaded + chunk.len() as u64;
- Box::pin(futures::stream::unfold(
- State::Ready(self.url),
- move |state| async move {
- match state {
- State::Ready(url) => {
- let response = reqwest::get(&url).await;
+ let percentage = (downloaded as f32 / total as f32) * 100.0;
- match response {
- Ok(response) => {
- if let Some(total) = response.content_length() {
- Some((
- (id, Progress::Started),
- State::Downloading {
- response,
- total,
- downloaded: 0,
- },
- ))
- } else {
- Some((
- (id, Progress::Errored),
- State::Finished,
- ))
- }
- }
- Err(_) => {
- Some(((id, Progress::Errored), State::Finished))
- }
- }
- }
+ (
+ Some((id, Progress::Advanced(percentage))),
State::Downloading {
- mut response,
+ response,
total,
downloaded,
- } => match response.chunk().await {
- Ok(Some(chunk)) => {
- let downloaded = downloaded + chunk.len() as u64;
-
- let percentage =
- (downloaded as f32 / total as f32) * 100.0;
-
- Some((
- (id, Progress::Advanced(percentage)),
- State::Downloading {
- response,
- total,
- downloaded,
- },
- ))
- }
- Ok(None) => {
- Some(((id, Progress::Finished), State::Finished))
- }
- Err(_) => {
- Some(((id, Progress::Errored), State::Finished))
- }
},
- State::Finished => {
- // We do not let the stream die, as it would start a
- // new download repeatedly if the user is not careful
- // in case of errors.
- let _: () = iced::futures::future::pending().await;
+ )
+ }
+ Ok(None) => (Some((id, Progress::Finished)), State::Finished),
+ Err(_) => (Some((id, Progress::Errored)), State::Finished),
+ },
+ State::Finished => {
+ // We do not let the stream die, as it would start a
+ // new download repeatedly if the user is not careful
+ // in case of errors.
+ let _: () = iced::futures::future::pending().await;
- None
- }
- }
- },
- ))
+ unreachable!()
+ }
}
}
diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml
new file mode 100644
index 00000000..6b4d9d10
--- /dev/null
+++ b/examples/websocket/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "websocket"
+version = "0.1.0"
+authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"]
+edition = "2018"
+publish = false
+
+[dependencies]
+iced = { path = "../..", features = ["tokio", "debug"] }
+iced_native = { path = "../../native" }
+iced_futures = { path = "../../futures" }
+
+[dependencies.async-tungstenite]
+version = "0.16"
+features = ["tokio-rustls-webpki-roots"]
+
+[dependencies.tokio]
+version = "1"
+features = ["time"]
+
+[dependencies.warp]
+version = "0.3"
diff --git a/examples/websocket/README.md b/examples/websocket/README.md
new file mode 100644
index 00000000..16e983da
--- /dev/null
+++ b/examples/websocket/README.md
@@ -0,0 +1,17 @@
+## Websocket
+
+A simple example that keeps a WebSocket connection open to an echo server.
+
+The example consists of 3 modules:
+- [`main`] contains the `Application` logic.
+- [`echo`] implements a WebSocket client for the [`echo::server`] with `async-tungstenite`.
+- [`echo::server`] implements a simple WebSocket echo server with `warp`.
+
+You can run it with `cargo run`:
+```
+cargo run --package websocket
+```
+
+[`main`]: src/main.rs
+[`echo`]: src/echo.rs
+[`echo::server`]: src/echo/server.rs
diff --git a/examples/websocket/src/echo.rs b/examples/websocket/src/echo.rs
new file mode 100644
index 00000000..13596ddd
--- /dev/null
+++ b/examples/websocket/src/echo.rs
@@ -0,0 +1,146 @@
+pub mod server;
+
+use iced_futures::futures;
+use iced_native::subscription::{self, Subscription};
+
+use futures::channel::mpsc;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+
+use async_tungstenite::tungstenite;
+
+pub fn connect() -> Subscription<Event> {
+ struct Connect;
+
+ subscription::unfold(
+ std::any::TypeId::of::<Connect>(),
+ State::Disconnected,
+ |state| async move {
+ match state {
+ State::Disconnected => {
+ const ECHO_SERVER: &str = "ws://localhost:3030";
+
+ match async_tungstenite::tokio::connect_async(ECHO_SERVER)
+ .await
+ {
+ Ok((websocket, _)) => {
+ let (sender, receiver) = mpsc::channel(100);
+
+ (
+ Some(Event::Connected(Connection(sender))),
+ State::Connected(websocket, receiver),
+ )
+ }
+ Err(_) => {
+ let _ = tokio::time::sleep(
+ tokio::time::Duration::from_secs(1),
+ )
+ .await;
+
+ (Some(Event::Disconnected), State::Disconnected)
+ }
+ }
+ }
+ State::Connected(mut websocket, mut input) => {
+ let mut fused_websocket = websocket.by_ref().fuse();
+
+ futures::select! {
+ received = fused_websocket.select_next_some() => {
+ match received {
+ Ok(tungstenite::Message::Text(message)) => {
+ (
+ Some(Event::MessageReceived(Message::User(message))),
+ State::Connected(websocket, input)
+ )
+ }
+ Ok(_) => {
+ (None, State::Connected(websocket, input))
+ }
+ Err(_) => {
+ (Some(Event::Disconnected), State::Disconnected)
+ }
+ }
+ }
+
+ message = input.select_next_some() => {
+ let result = websocket.send(tungstenite::Message::Text(String::from(message))).await;
+
+ if result.is_ok() {
+ (None, State::Connected(websocket, input))
+ } else {
+ (Some(Event::Disconnected), State::Disconnected)
+ }
+ }
+ }
+ }
+ }
+ },
+ )
+}
+
+#[derive(Debug)]
+enum State {
+ Disconnected,
+ Connected(
+ async_tungstenite::WebSocketStream<
+ async_tungstenite::tokio::ConnectStream,
+ >,
+ mpsc::Receiver<Message>,
+ ),
+}
+
+#[derive(Debug, Clone)]
+pub enum Event {
+ Connected(Connection),
+ Disconnected,
+ MessageReceived(Message),
+}
+
+#[derive(Debug, Clone)]
+pub struct Connection(mpsc::Sender<Message>);
+
+impl Connection {
+ pub fn send(&mut self, message: Message) {
+ let _ = self
+ .0
+ .try_send(message)
+ .expect("Send message to echo server");
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum Message {
+ Connected,
+ Disconnected,
+ User(String),
+}
+
+impl Message {
+ pub fn new(message: &str) -> Option<Self> {
+ if message.is_empty() {
+ None
+ } else {
+ Some(Self::User(message.to_string()))
+ }
+ }
+
+ pub fn connected() -> Self {
+ Message::Connected
+ }
+
+ pub fn disconnected() -> Self {
+ Message::Disconnected
+ }
+}
+
+impl From<Message> for String {
+ fn from(message: Message) -> Self {
+ match message {
+ Message::Connected => String::from("Connected successfully!"),
+ Message::Disconnected => {
+ String::from("Connection lost... Retrying...")
+ }
+ Message::User(message) => message,
+ }
+ }
+}
diff --git a/examples/websocket/src/echo/server.rs b/examples/websocket/src/echo/server.rs
new file mode 100644
index 00000000..7702d417
--- /dev/null
+++ b/examples/websocket/src/echo/server.rs
@@ -0,0 +1,57 @@
+use iced_futures::futures;
+
+use futures::channel::mpsc;
+use futures::{SinkExt, StreamExt};
+use warp::ws::WebSocket;
+use warp::Filter;
+
+// Basic WebSocket echo server adapted from:
+// https://github.com/seanmonstar/warp/blob/3ff2eaf41eb5ac9321620e5a6434d5b5ec6f313f/examples/websockets_chat.rs
+//
+// Copyright (c) 2018-2020 Sean McArthur
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+pub async fn run() {
+ let routes = warp::path::end().and(warp::ws()).map(|ws: warp::ws::Ws| {
+ ws.on_upgrade(move |socket| user_connected(socket))
+ });
+
+ warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
+}
+
+async fn user_connected(ws: WebSocket) {
+ let (mut user_ws_tx, mut user_ws_rx) = ws.split();
+ let (mut tx, mut rx) = mpsc::unbounded();
+
+ tokio::task::spawn(async move {
+ while let Some(message) = rx.next().await {
+ let _ = user_ws_tx.send(message).await.unwrap_or_else(|e| {
+ eprintln!("websocket send error: {}", e);
+ });
+ }
+ });
+
+ while let Some(result) = user_ws_rx.next().await {
+ let msg = match result {
+ Ok(msg) => msg,
+ Err(_) => break,
+ };
+
+ let _ = tx.send(msg).await;
+ }
+}
diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs
new file mode 100644
index 00000000..c03a9f3a
--- /dev/null
+++ b/examples/websocket/src/main.rs
@@ -0,0 +1,162 @@
+mod echo;
+
+use iced::alignment::{self, Alignment};
+use iced::button::{self, Button};
+use iced::executor;
+use iced::scrollable::{self, Scrollable};
+use iced::text_input::{self, TextInput};
+use iced::{
+ Application, Color, Column, Command, Container, Element, Length, Row,
+ Settings, Subscription, Text,
+};
+
+pub fn main() -> iced::Result {
+ WebSocket::run(Settings::default())
+}
+
+#[derive(Default)]
+struct WebSocket {
+ messages: Vec<echo::Message>,
+ message_log: scrollable::State,
+ new_message: String,
+ new_message_state: text_input::State,
+ new_message_button: button::State,
+ state: State,
+}
+
+#[derive(Debug, Clone)]
+enum Message {
+ NewMessageChanged(String),
+ Send(echo::Message),
+ Echo(echo::Event),
+ Server,
+}
+
+impl Application for WebSocket {
+ type Message = Message;
+ type Flags = ();
+ type Executor = executor::Default;
+
+ fn new(_flags: Self::Flags) -> (Self, Command<Message>) {
+ (
+ Self::default(),
+ Command::perform(echo::server::run(), |_| Message::Server),
+ )
+ }
+
+ fn title(&self) -> String {
+ String::from("WebSocket - Iced")
+ }
+
+ fn update(&mut self, message: Message) -> Command<Message> {
+ match message {
+ Message::NewMessageChanged(new_message) => {
+ self.new_message = new_message;
+ }
+ Message::Send(message) => match &mut self.state {
+ State::Connected(connection) => {
+ self.new_message.clear();
+
+ connection.send(message);
+ }
+ State::Disconnected => {}
+ },
+ Message::Echo(event) => match event {
+ echo::Event::Connected(connection) => {
+ self.state = State::Connected(connection);
+
+ self.messages.push(echo::Message::connected());
+ }
+ echo::Event::Disconnected => {
+ self.state = State::Disconnected;
+
+ self.messages.push(echo::Message::disconnected());
+ }
+ echo::Event::MessageReceived(message) => {
+ self.messages.push(message);
+ self.message_log.snap_to(1.0);
+ }
+ },
+ Message::Server => {}
+ }
+
+ Command::none()
+ }
+
+ fn subscription(&self) -> Subscription<Message> {
+ echo::connect().map(Message::Echo)
+ }
+
+ fn view(&mut self) -> Element<Message> {
+ let message_log = if self.messages.is_empty() {
+ Container::new(
+ Text::new("Your messages will appear here...")
+ .color(Color::from_rgb8(0x88, 0x88, 0x88)),
+ )
+ .width(Length::Fill)
+ .height(Length::Fill)
+ .center_x()
+ .center_y()
+ .into()
+ } else {
+ self.messages
+ .iter()
+ .cloned()
+ .fold(
+ Scrollable::new(&mut self.message_log),
+ |scrollable, message| scrollable.push(Text::new(message)),
+ )
+ .width(Length::Fill)
+ .height(Length::Fill)
+ .spacing(10)
+ .into()
+ };
+
+ let new_message_input = {
+ let mut input = TextInput::new(
+ &mut self.new_message_state,
+ "Type a message...",
+ &self.new_message,
+ Message::NewMessageChanged,
+ )
+ .padding(10);
+
+ let mut button = Button::new(
+ &mut self.new_message_button,
+ Text::new("Send")
+ .height(Length::Fill)
+ .vertical_alignment(alignment::Vertical::Center),
+ )
+ .padding([0, 20]);
+
+ if matches!(self.state, State::Connected(_)) {
+ if let Some(message) = echo::Message::new(&self.new_message) {
+ input = input.on_submit(Message::Send(message.clone()));
+ button = button.on_press(Message::Send(message));
+ }
+ }
+
+ Row::with_children(vec![input.into(), button.into()])
+ .spacing(10)
+ .align_items(Alignment::Fill)
+ };
+
+ Column::with_children(vec![message_log, new_message_input.into()])
+ .width(Length::Fill)
+ .height(Length::Fill)
+ .padding(20)
+ .spacing(10)
+ .into()
+ }
+}
+
+enum State {
+ Disconnected,
+ Connected(echo::Connection),
+}
+
+impl Default for State {
+ fn default() -> Self {
+ Self::Disconnected
+ }
+}
diff --git a/native/src/subscription.rs b/native/src/subscription.rs
index 2950879c..63834654 100644
--- a/native/src/subscription.rs
+++ b/native/src/subscription.rs
@@ -1,8 +1,12 @@
//! Listen to external events in your application.
use crate::event::{self, Event};
use crate::Hasher;
+
+use iced_futures::futures::{self, Future, Stream};
use iced_futures::BoxStream;
+use std::hash::Hash;
+
/// A request to listen to external events.
///
/// Besides performing async actions on demand with [`Command`], most
@@ -29,20 +33,14 @@ pub type Tracker =
pub use iced_futures::subscription::Recipe;
-mod events;
-
-use events::Events;
-
/// Returns a [`Subscription`] to all the runtime events.
///
/// This subscription will notify your application of any [`Event`] that was
/// not captured by any widget.
pub fn events() -> Subscription<Event> {
- Subscription::from_recipe(Events {
- f: |event, status| match status {
- event::Status::Ignored => Some(event),
- event::Status::Captured => None,
- },
+ events_with(|event, status| match status {
+ event::Status::Ignored => Some(event),
+ event::Status::Captured => None,
})
}
@@ -60,5 +58,151 @@ pub fn events_with<Message>(
where
Message: 'static + Send,
{
- Subscription::from_recipe(Events { f })
+ Subscription::from_recipe(Runner {
+ id: f,
+ spawn: move |events| {
+ use futures::future;
+ use futures::stream::StreamExt;
+
+ events.filter_map(move |(event, status)| {
+ future::ready(f(event, status))
+ })
+ },
+ })
+}
+
+/// Returns a [`Subscription`] that will create and asynchronously run the
+/// given [`Stream`].
+///
+/// The `id` will be used to uniquely identify the [`Subscription`].
+pub fn run<I, S, Message>(id: I, stream: S) -> Subscription<Message>
+where
+ I: Hash + 'static,
+ S: Stream<Item = Message> + Send + 'static,
+ Message: 'static,
+{
+ Subscription::from_recipe(Runner {
+ id,
+ spawn: move |_| stream,
+ })
+}
+
+/// Returns a [`Subscription`] that will create and asynchronously run a
+/// [`Stream`] that will call the provided closure to produce every `Message`.
+///
+/// The `id` will be used to uniquely identify the [`Subscription`].
+///
+/// # Creating an asynchronous worker with bidirectional communication
+/// You can leverage this helper to create a [`Subscription`] that spawns
+/// an asynchronous worker in the background and establish a channel of
+/// communication with an `iced` application.
+///
+/// You can achieve this by creating an `mpsc` channel inside the closure
+/// and returning the `Sender` as a `Message` for the `Application`:
+///
+/// ```
+/// use iced_native::subscription::{self, Subscription};
+/// use iced_native::futures::channel::mpsc;
+///
+/// pub enum Event {
+/// Ready(mpsc::Sender<Input>),
+/// WorkFinished,
+/// // ...
+/// }
+///
+/// enum Input {
+/// DoSomeWork,
+/// // ...
+/// }
+///
+/// enum State {
+/// Starting,
+/// Ready(mpsc::Receiver<Input>),
+/// }
+///
+/// fn some_worker() -> Subscription<Event> {
+/// struct SomeWorker;
+///
+/// subscription::unfold(std::any::TypeId::of::<SomeWorker>(), State::Starting, |state| async move {
+/// match state {
+/// State::Starting => {
+/// // Create channel
+/// let (sender, receiver) = mpsc::channel(100);
+///
+/// (Some(Event::Ready(sender)), State::Ready(receiver))
+/// }
+/// State::Ready(mut receiver) => {
+/// use iced_native::futures::StreamExt;
+///
+/// // Read next input sent from `Application`
+/// let input = receiver.select_next_some().await;
+///
+/// match input {
+/// Input::DoSomeWork => {
+/// // Do some async work...
+///
+/// // Finally, we can optionally return a message to tell the
+/// // `Application` the work is done
+/// (Some(Event::WorkFinished), State::Ready(receiver))
+/// }
+/// }
+/// }
+/// }
+/// })
+/// }
+/// ```
+///
+/// Check out the [`websocket`] example, which showcases this pattern to maintain a WebSocket
+/// connection open.
+///
+/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.4/examples/websocket
+pub fn unfold<I, T, Fut, Message>(
+ id: I,
+ initial: T,
+ mut f: impl FnMut(T) -> Fut + Send + Sync + 'static,
+) -> Subscription<Message>
+where
+ I: Hash + 'static,
+ T: Send + 'static,
+ Fut: Future<Output = (Option<Message>, T)> + Send + 'static,
+ Message: 'static + Send,
+{
+ use futures::future::{self, FutureExt};
+ use futures::stream::StreamExt;
+
+ run(
+ id,
+ futures::stream::unfold(initial, move |state| f(state).map(Some))
+ .filter_map(future::ready),
+ )
+}
+
+struct Runner<I, F, S, Message>
+where
+ F: FnOnce(EventStream) -> S,
+ S: Stream<Item = Message>,
+{
+ id: I,
+ spawn: F,
+}
+
+impl<I, S, F, Message> Recipe<Hasher, (Event, event::Status)>
+ for Runner<I, F, S, Message>
+where
+ I: Hash + 'static,
+ F: FnOnce(EventStream) -> S,
+ S: Stream<Item = Message> + Send + 'static,
+{
+ type Output = Message;
+
+ fn hash(&self, state: &mut Hasher) {
+ std::any::TypeId::of::<I>().hash(state);
+ self.id.hash(state);
+ }
+
+ fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
+ use futures::stream::StreamExt;
+
+ (self.spawn)(input).boxed()
+ }
}
diff --git a/native/src/subscription/events.rs b/native/src/subscription/events.rs
deleted file mode 100644
index ca143bb3..00000000
--- a/native/src/subscription/events.rs
+++ /dev/null
@@ -1,35 +0,0 @@
-use crate::event::{self, Event};
-use crate::subscription::{EventStream, Recipe};
-use crate::Hasher;
-use iced_futures::futures::future;
-use iced_futures::futures::StreamExt;
-use iced_futures::BoxStream;
-
-pub struct Events<Message> {
- pub(super) f: fn(Event, event::Status) -> Option<Message>,
-}
-
-impl<Message> Recipe<Hasher, (Event, event::Status)> for Events<Message>
-where
- Message: 'static + Send,
-{
- type Output = Message;
-
- fn hash(&self, state: &mut Hasher) {
- use std::hash::Hash;
-
- struct Marker;
- std::any::TypeId::of::<Marker>().hash(state);
- self.f.hash(state);
- }
-
- fn stream(
- self: Box<Self>,
- event_stream: EventStream,
- ) -> BoxStream<Self::Output> {
- let stream = event_stream.filter_map(move |(event, status)| {
- future::ready((self.f)(event, status))
- });
- iced_futures::boxed_stream(stream)
- }
-}