summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--core/src/subscription.rs37
-rw-r--r--examples/clock.rs49
-rw-r--r--winit/src/application.rs50
3 files changed, 48 insertions, 88 deletions
diff --git a/core/src/subscription.rs b/core/src/subscription.rs
index 1e6695d6..796982c7 100644
--- a/core/src/subscription.rs
+++ b/core/src/subscription.rs
@@ -2,60 +2,51 @@
/// An event subscription.
pub struct Subscription<T> {
- definitions: Vec<Box<dyn Definition<Message = T>>>,
+ handles: Vec<Box<dyn Handle<Output = T>>>,
}
impl<T> Subscription<T> {
pub fn none() -> Self {
Self {
- definitions: Vec::new(),
+ handles: Vec::new(),
}
}
pub fn batch(subscriptions: impl Iterator<Item = Subscription<T>>) -> Self {
Self {
- definitions: subscriptions
- .flat_map(|subscription| subscription.definitions)
+ handles: subscriptions
+ .flat_map(|subscription| subscription.handles)
.collect(),
}
}
- pub fn definitions(self) -> Vec<Box<dyn Definition<Message = T>>> {
- self.definitions
+ pub fn handles(self) -> Vec<Box<dyn Handle<Output = T>>> {
+ self.handles
}
}
impl<T, A> From<A> for Subscription<T>
where
- A: Definition<Message = T> + 'static,
+ A: Handle<Output = T> + 'static,
{
- fn from(definition: A) -> Self {
+ fn from(handle: A) -> Self {
Self {
- definitions: vec![Box::new(definition)],
+ handles: vec![Box::new(handle)],
}
}
}
-/// The definition of an event subscription.
-pub trait Definition {
- type Message;
+/// The handle of an event subscription.
+pub trait Handle {
+ type Output;
fn id(&self) -> u64;
- fn stream(
- &self,
- ) -> (
- futures::stream::BoxStream<'static, Self::Message>,
- Box<dyn Handle>,
- );
-}
-
-pub trait Handle {
- fn cancel(&mut self);
+ fn stream(&self) -> futures::stream::BoxStream<'static, Self::Output>;
}
impl<T> std::fmt::Debug for Subscription<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("Command").finish()
+ f.debug_struct("Subscription").finish()
}
}
diff --git a/examples/clock.rs b/examples/clock.rs
index b1ee8ab1..5a404bfa 100644
--- a/examples/clock.rs
+++ b/examples/clock.rs
@@ -82,7 +82,7 @@ impl Application for Clock {
}
mod time {
- use std::sync::{Arc, Mutex};
+ use std::sync::Arc;
pub fn every<Message>(
duration: std::time::Duration,
@@ -108,59 +108,30 @@ mod time {
>,
}
- struct TickState {
- alive: Arc<Mutex<bool>>,
- }
-
- impl iced::subscription::Handle for TickState {
- fn cancel(&mut self) {
- match self.alive.lock() {
- Ok(mut guard) => *guard = false,
- _ => {}
- }
- }
- }
-
- impl<Message> iced::subscription::Definition for Tick<Message>
+ impl<Message> iced::subscription::Handle for Tick<Message>
where
Message: 'static,
{
- type Message = Message;
+ type Output = Message;
fn id(&self) -> u64 {
0
}
- fn stream(
- &self,
- ) -> (
- futures::stream::BoxStream<'static, Message>,
- Box<dyn iced::subscription::Handle>,
- ) {
+ fn stream(&self) -> futures::stream::BoxStream<'static, Message> {
use futures::StreamExt;
let duration = self.duration.clone();
let function = self.message.clone();
- let alive = Arc::new(Mutex::new(true));
-
- let state = TickState {
- alive: alive.clone(),
- };
-
- let stream = futures::stream::poll_fn(move |_| {
- std::thread::sleep(duration);
-
- if !*alive.lock().unwrap() {
- return std::task::Poll::Ready(None);
- }
- let now = chrono::Local::now();
+ let stream =
+ futures::stream::iter(std::iter::repeat(())).map(move |_| {
+ std::thread::sleep(duration);
- std::task::Poll::Ready(Some(now))
- })
- .map(move |time| function(time));
+ function(chrono::Local::now())
+ });
- (stream.boxed(), Box::new(state))
+ stream.boxed()
}
}
}
diff --git a/winit/src/application.rs b/winit/src/application.rs
index 959e142d..62970810 100644
--- a/winit/src/application.rs
+++ b/winit/src/application.rs
@@ -2,8 +2,8 @@ use crate::{
conversion,
input::{keyboard, mouse},
renderer::{Target, Windowed},
- subscription, Cache, Command, Container, Debug, Element, Event, Length,
- MouseCursor, Settings, Subscription, UserInterface,
+ Cache, Command, Container, Debug, Element, Event, Length, MouseCursor,
+ Settings, Subscription, UserInterface,
};
use std::collections::HashMap;
@@ -420,7 +420,7 @@ fn spawn<Message: Send>(
}
pub struct Subscriptions {
- alive: HashMap<u64, Box<dyn subscription::Handle>>,
+ alive: HashMap<u64, futures::channel::oneshot::Sender<()>>,
}
impl Subscriptions {
@@ -436,46 +436,44 @@ impl Subscriptions {
thread_pool: &mut futures::executor::ThreadPool,
proxy: &winit::event_loop::EventLoopProxy<Message>,
) {
- use futures::stream::StreamExt;
+ use futures::{future::FutureExt, stream::StreamExt};
- let definitions = subscriptions.definitions();
+ let handles = subscriptions.handles();
let mut alive = std::collections::HashSet::new();
- for definition in definitions {
- let id = definition.id();
+ for handle in handles {
+ let id = handle.id();
let _ = alive.insert(id);
if !self.alive.contains_key(&id) {
- let (stream, handle) = definition.stream();
+ let (cancel, cancelled) = futures::channel::oneshot::channel();
+
+ let stream = handle.stream();
let proxy =
std::sync::Arc::new(std::sync::Mutex::new(proxy.clone()));
- let future = stream.for_each(move |message| {
- proxy
- .lock()
- .expect("Acquire event loop proxy lock")
- .send_event(message)
- .expect("Send subscription result to event loop");
+ let future = futures::future::select(
+ cancelled,
+ stream.for_each(move |message| {
+ proxy
+ .lock()
+ .expect("Acquire event loop proxy lock")
+ .send_event(message)
+ .expect("Send subscription result to event loop");
- futures::future::ready(())
- });
+ futures::future::ready(())
+ }),
+ )
+ .map(|_| ());
thread_pool.spawn_ok(future);
- let _ = self.alive.insert(id, handle);
+ let _ = self.alive.insert(id, cancel);
}
}
- self.alive.retain(|id, handle| {
- let is_still_alive = alive.contains(&id);
-
- if !is_still_alive {
- handle.cancel();
- }
-
- is_still_alive
- });
+ self.alive.retain(|id, _| alive.contains(&id));
}
}