summaryrefslogtreecommitdiffstats
path: root/native
diff options
context:
space:
mode:
authorLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2022-01-16 15:50:19 +0700
committerLibravatar Héctor Ramón Jiménez <hector0193@gmail.com>2022-01-16 15:50:19 +0700
commitdc50a2830ab553dfc5dfc28d4fd1af6b3981c656 (patch)
tree4ff8bce15f88a0ca610551fdd81afa968f7fda58 /native
parent2f557731f313ccc94b5d0ebb4ee5603624670daf (diff)
downloadiced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.tar.gz
iced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.tar.bz2
iced-dc50a2830ab553dfc5dfc28d4fd1af6b3981c656.zip
Draft `websocket` example :tada:
Diffstat (limited to 'native')
-rw-r--r--native/src/subscription.rs98
1 files changed, 25 insertions, 73 deletions
diff --git a/native/src/subscription.rs b/native/src/subscription.rs
index 85ad96fa..70cd269e 100644
--- a/native/src/subscription.rs
+++ b/native/src/subscription.rs
@@ -2,7 +2,6 @@
use crate::event::{self, Event};
use crate::Hasher;
-use iced_futures::futures::channel::mpsc;
use iced_futures::futures::{self, Future, Stream};
use iced_futures::BoxStream;
@@ -59,21 +58,9 @@ pub fn events_with<Message>(
where
Message: 'static + Send,
{
- #[derive(Debug, Clone, Copy, Hash)]
- struct Events(u64);
-
- let hash = {
- use std::hash::Hasher as _;
-
- let mut hasher = Hasher::default();
-
- f.hash(&mut hasher);
-
- hasher.finish()
- };
-
Subscription::from_recipe(Runner {
- initial: Events(hash),
+ id: f,
+ initial: (),
spawn: move |_, events| {
use futures::future;
use futures::stream::StreamExt;
@@ -89,16 +76,19 @@ where
/// [`Stream`] returned by the provided closure.
///
/// The `initial` state will be used to uniquely identify the [`Subscription`].
-pub fn run<T, S, Message>(
+pub fn run<I, T, S, Message>(
+ id: I,
initial: T,
f: impl FnOnce(T) -> S + 'static,
) -> Subscription<Message>
where
- Message: 'static,
- T: Clone + Hash + 'static,
+ I: Hash + 'static,
+ T: 'static,
S: Stream<Item = Message> + Send + 'static,
+ Message: 'static,
{
Subscription::from_recipe(Runner {
+ id,
initial,
spawn: move |initial, _| f(initial),
})
@@ -108,79 +98,41 @@ where
/// [`Stream`] that will call the provided closure to produce every `Message`.
///
/// The `initial` state will be used to uniquely identify the [`Subscription`].
-pub fn unfold<T, Fut, Message>(
+pub fn unfold<I, T, Fut, Message>(
+ id: I,
initial: T,
mut f: impl FnMut(T) -> Fut + Send + Sync + 'static,
) -> Subscription<Message>
where
- Message: 'static,
- T: Clone + Hash + Send + 'static,
- Fut: Future<Output = (Message, T)> + Send + 'static,
-{
- use futures::future::FutureExt;
-
- run(initial, move |initial| {
- futures::stream::unfold(initial, move |state| f(state).map(Some))
- })
-}
-
-/// Returns a [`Subscription`] that will open a channel and asynchronously run a
-/// [`Stream`] that will call the provided closure to produce every `Message`.
-///
-/// When the [`Subscription`] starts, an `on_ready` message will be produced
-/// containing the [`mpsc::Sender`] end of the channel, which can be used by
-/// the parent application to send `Input` to the running [`Subscription`].
-///
-/// The provided closure should use the [`mpsc::Receiver`] argument to await for
-/// any `Input`.
-///
-/// This function is really useful to create asynchronous workers with
-/// bidirectional communication with a parent application.
-///
-/// The `initial` state will be used to uniquely identify the [`Subscription`].
-pub fn worker<T, Fut, Message, Input>(
- initial: T,
- on_ready: impl FnOnce(mpsc::Sender<Input>) -> Message + 'static,
- f: impl FnMut(T, &mut mpsc::Receiver<Input>) -> Fut + Send + Sync + 'static,
-) -> Subscription<Message>
-where
- T: Clone + Hash + Send + 'static,
- Fut: Future<Output = (Message, T)> + Send + 'static,
- Message: Send + 'static,
- Input: Send + 'static,
+ I: Hash + 'static,
+ T: Send + 'static,
+ Fut: Future<Output = (Option<Message>, T)> + Send + 'static,
+ Message: 'static + Send,
{
- use futures::future;
+ use futures::future::{self, FutureExt};
use futures::stream::StreamExt;
- run(initial, move |initial| {
- let (sender, receiver) = mpsc::channel(100);
-
- futures::stream::once(future::ready(on_ready(sender))).chain(
- futures::stream::unfold(
- (f, initial, receiver),
- move |(mut f, state, mut receiver)| async {
- let (message, state) = f(state, &mut receiver).await;
-
- Some((message, (f, state, receiver)))
- },
- ),
- )
+ run(id, initial, move |initial| {
+ futures::stream::unfold(initial, move |state| f(state).map(Some))
+ .filter_map(future::ready)
})
}
-struct Runner<T, F, S, Message>
+struct Runner<I, T, F, S, Message>
where
F: FnOnce(T, EventStream) -> S,
S: Stream<Item = Message>,
{
+ id: I,
initial: T,
spawn: F,
}
-impl<T, S, F, Message> Recipe<Hasher, (Event, event::Status)>
- for Runner<T, F, S, Message>
+impl<I, T, S, F, Message> Recipe<Hasher, (Event, event::Status)>
+ for Runner<I, T, F, S, Message>
where
- T: Clone + Hash + 'static,
+ I: Hash + 'static,
+ T: 'static,
F: FnOnce(T, EventStream) -> S,
S: Stream<Item = Message> + Send + 'static,
{
@@ -189,7 +141,7 @@ where
fn hash(&self, state: &mut Hasher) {
std::any::TypeId::of::<T>().hash(state);
- self.initial.hash(state);
+ self.id.hash(state);
}
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {