summaryrefslogtreecommitdiffstats
path: root/futures
diff options
context:
space:
mode:
Diffstat (limited to 'futures')
-rw-r--r--futures/src/subscription.rs37
1 files changed, 27 insertions, 10 deletions
diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs
index 7163248d..4d5a1192 100644
--- a/futures/src/subscription.rs
+++ b/futures/src/subscription.rs
@@ -10,6 +10,7 @@ use crate::{BoxStream, MaybeSend};
use futures::channel::mpsc;
use futures::never::Never;
+use std::any::TypeId;
use std::hash::Hash;
/// A stream of runtime events.
@@ -88,7 +89,10 @@ impl<Message> Subscription<Message> {
}
/// Transforms the [`Subscription`] output with the given function.
- pub fn map<A>(mut self, f: fn(Message) -> A) -> Subscription<A>
+ pub fn map<A>(
+ mut self,
+ f: impl Fn(Message) -> A + MaybeSend + Clone + 'static,
+ ) -> Subscription<A>
where
Message: 'static,
A: 'static,
@@ -97,8 +101,9 @@ impl<Message> Subscription<Message> {
recipes: self
.recipes
.drain(..)
- .map(|recipe| {
- Box::new(Map::new(recipe, f)) as Box<dyn Recipe<Output = A>>
+ .map(move |recipe| {
+ Box::new(Map::new(recipe, f.clone()))
+ as Box<dyn Recipe<Output = A>>
})
.collect(),
}
@@ -143,27 +148,39 @@ pub trait Recipe {
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
}
-struct Map<A, B> {
+struct Map<A, B, F>
+where
+ F: Fn(A) -> B + 'static,
+{
+ id: TypeId,
recipe: Box<dyn Recipe<Output = A>>,
- mapper: fn(A) -> B,
+ mapper: F,
}
-impl<A, B> Map<A, B> {
- fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: fn(A) -> B) -> Self {
- Map { recipe, mapper }
+impl<A, B, F> Map<A, B, F>
+where
+ F: Fn(A) -> B + 'static,
+{
+ fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: F) -> Self {
+ Map {
+ id: TypeId::of::<F>(),
+ recipe,
+ mapper,
+ }
}
}
-impl<A, B> Recipe for Map<A, B>
+impl<A, B, F> Recipe for Map<A, B, F>
where
A: 'static,
B: 'static,
+ F: Fn(A) -> B + 'static + MaybeSend,
{
type Output = B;
fn hash(&self, state: &mut Hasher) {
+ self.id.hash(state);
self.recipe.hash(state);
- self.mapper.hash(state);
}
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {