1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
use iced_native::{Event, Hasher, Subscription};
use std::collections::HashMap;
pub struct Pool {
alive: HashMap<u64, Handle>,
}
pub struct Handle {
_cancel: futures::channel::oneshot::Sender<()>,
listener: Option<futures::channel::mpsc::Sender<Event>>,
}
impl Pool {
pub fn new() -> Self {
Self {
alive: HashMap::new(),
}
}
pub fn update<Message: Send>(
&mut self,
subscription: Subscription<Message>,
thread_pool: &mut futures::executor::ThreadPool,
proxy: &winit::event_loop::EventLoopProxy<Message>,
) {
use futures::{future::FutureExt, stream::StreamExt};
let recipes = subscription.recipes();
let mut alive = std::collections::HashSet::new();
for recipe in recipes {
let id = {
use std::hash::Hasher as _;
let mut hasher = Hasher::default();
recipe.hash(&mut hasher);
hasher.finish()
};
let _ = alive.insert(id);
if !self.alive.contains_key(&id) {
let (cancel, cancelled) = futures::channel::oneshot::channel();
// TODO: Use bus if/when it supports async
let (event_sender, event_receiver) =
futures::channel::mpsc::channel(100);
let stream = recipe.stream(event_receiver.boxed());
let proxy = proxy.clone();
let future = futures::future::select(
cancelled,
stream.for_each(move |message| {
proxy
.send_event(message)
.expect("Send subscription result to event loop");
futures::future::ready(())
}),
)
.map(|_| ());
thread_pool.spawn_ok(future);
let _ = self.alive.insert(
id,
Handle {
_cancel: cancel,
listener: if event_sender.is_closed() {
None
} else {
Some(event_sender)
},
},
);
}
}
self.alive.retain(|id, _| alive.contains(&id));
}
pub fn broadcast_event(&mut self, event: Event) {
self.alive
.values_mut()
.filter_map(|connection| connection.listener.as_mut())
.for_each(|listener| {
if let Err(error) = listener.try_send(event.clone()) {
log::error!(
"Error sending event to subscription: {:?}",
error
);
}
});
}
}
|