summaryrefslogtreecommitdiffstats
path: root/winit/src/subscription.rs
blob: bad68d55858682dc1ecdbc24bcf95cd424159f60 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use iced_native::{Event, Hasher, Subscription};
use std::collections::HashMap;

pub struct Pool {
    alive: HashMap<u64, Handle>,
}

pub struct Handle {
    _cancel: futures::channel::oneshot::Sender<()>,
    listener: Option<futures::channel::mpsc::Sender<Event>>,
}

impl Pool {
    pub fn new() -> Self {
        Self {
            alive: HashMap::new(),
        }
    }

    pub fn update<Message: Send>(
        &mut self,
        subscription: Subscription<Message>,
        thread_pool: &mut futures::executor::ThreadPool,
        proxy: &winit::event_loop::EventLoopProxy<Message>,
    ) {
        use futures::{future::FutureExt, stream::StreamExt};

        let recipes = subscription.recipes();
        let mut alive = std::collections::HashSet::new();

        for recipe in recipes {
            let id = {
                use std::hash::Hasher as _;

                let mut hasher = Hasher::default();
                recipe.hash(&mut hasher);

                hasher.finish()
            };

            let _ = alive.insert(id);

            if !self.alive.contains_key(&id) {
                let (cancel, cancelled) = futures::channel::oneshot::channel();

                // TODO: Use bus if/when it supports async
                let (event_sender, event_receiver) =
                    futures::channel::mpsc::channel(100);

                let stream = recipe.stream(event_receiver.boxed());
                let proxy = proxy.clone();

                let future = futures::future::select(
                    cancelled,
                    stream.for_each(move |message| {
                        proxy
                            .send_event(message)
                            .expect("Send subscription result to event loop");

                        futures::future::ready(())
                    }),
                )
                .map(|_| ());

                thread_pool.spawn_ok(future);

                let _ = self.alive.insert(
                    id,
                    Handle {
                        _cancel: cancel,
                        listener: if event_sender.is_closed() {
                            None
                        } else {
                            Some(event_sender)
                        },
                    },
                );
            }
        }

        self.alive.retain(|id, _| alive.contains(&id));
    }

    pub fn broadcast_event(&mut self, event: Event) {
        self.alive
            .values_mut()
            .filter_map(|connection| connection.listener.as_mut())
            .for_each(|listener| {
                if let Err(error) = listener.try_send(event.clone()) {
                    log::error!(
                        "Error sending event to subscription: {:?}",
                        error
                    );
                }
            });
    }
}