From 48145ba51e045f8b0b4788f3a75d20b9d9b7e6ad Mon Sep 17 00:00:00 2001 From: Héctor Ramón Jiménez Date: Sat, 7 Dec 2019 08:51:44 +0100 Subject: Use `oneshot` and `future::select` to cancel streams --- examples/clock.rs | 49 ++++++++++--------------------------------------- 1 file changed, 10 insertions(+), 39 deletions(-) (limited to 'examples/clock.rs') diff --git a/examples/clock.rs b/examples/clock.rs index b1ee8ab1..5a404bfa 100644 --- a/examples/clock.rs +++ b/examples/clock.rs @@ -82,7 +82,7 @@ impl Application for Clock { } mod time { - use std::sync::{Arc, Mutex}; + use std::sync::Arc; pub fn every( duration: std::time::Duration, @@ -108,59 +108,30 @@ mod time { >, } - struct TickState { - alive: Arc>, - } - - impl iced::subscription::Handle for TickState { - fn cancel(&mut self) { - match self.alive.lock() { - Ok(mut guard) => *guard = false, - _ => {} - } - } - } - - impl iced::subscription::Definition for Tick + impl iced::subscription::Handle for Tick where Message: 'static, { - type Message = Message; + type Output = Message; fn id(&self) -> u64 { 0 } - fn stream( - &self, - ) -> ( - futures::stream::BoxStream<'static, Message>, - Box, - ) { + fn stream(&self) -> futures::stream::BoxStream<'static, Message> { use futures::StreamExt; let duration = self.duration.clone(); let function = self.message.clone(); - let alive = Arc::new(Mutex::new(true)); - - let state = TickState { - alive: alive.clone(), - }; - - let stream = futures::stream::poll_fn(move |_| { - std::thread::sleep(duration); - - if !*alive.lock().unwrap() { - return std::task::Poll::Ready(None); - } - let now = chrono::Local::now(); + let stream = + futures::stream::iter(std::iter::repeat(())).map(move |_| { + std::thread::sleep(duration); - std::task::Poll::Ready(Some(now)) - }) - .map(move |time| function(time)); + function(chrono::Local::now()) + }); - (stream.boxed(), Box::new(state)) + stream.boxed() } } } -- cgit