summaryrefslogtreecommitdiffstats
path: root/src/message_subscriptions.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message_subscriptions.rs')
-rw-r--r--src/message_subscriptions.rs86
1 files changed, 86 insertions, 0 deletions
diff --git a/src/message_subscriptions.rs b/src/message_subscriptions.rs
new file mode 100644
index 0000000..5b1d276
--- /dev/null
+++ b/src/message_subscriptions.rs
@@ -0,0 +1,86 @@
+use std::collections::HashMap;
+
+use jid::BareJID;
+use tokio::sync::mpsc::{self, Receiver};
+use uuid::Uuid;
+
+use crate::message::MacawMessage;
+
+pub struct MessageSubscriptions {
+ all: HashMap<Uuid, mpsc::Sender<(BareJID, MacawMessage)>>,
+ subset: HashMap<BareJID, HashMap<Uuid, mpsc::Sender<MacawMessage>>>,
+}
+
+impl MessageSubscriptions {
+ pub fn new() -> Self {
+ Self {
+ all: HashMap::new(),
+ subset: HashMap::new(),
+ }
+ }
+
+ pub async fn broadcast(&mut self, to: BareJID, message: MacawMessage) {
+ // subscriptions to all
+ let mut removals = Vec::new();
+ for (id, sender) in &self.all {
+ match sender.send((to.clone(), message.clone())).await {
+ Ok(_) => {}
+ Err(_) => {
+ removals.push(*id);
+ }
+ }
+ }
+ for removal in removals {
+ self.all.remove(&removal);
+ }
+
+ // subscriptions to specific chat
+ if let Some(subscribers) = self.subset.get_mut(&to) {
+ let mut removals = Vec::new();
+ for (id, sender) in &*subscribers {
+ match sender.send(message.clone()).await {
+ Ok(_) => {}
+ Err(_) => {
+ removals.push(*id);
+ }
+ }
+ }
+ for removal in removals {
+ subscribers.remove(&removal);
+ }
+ if subscribers.is_empty() {
+ self.subset.remove(&to);
+ }
+ }
+ }
+
+ pub fn subscribe_all(&mut self) -> (Uuid, Receiver<(BareJID, MacawMessage)>) {
+ let (send, recv) = mpsc::channel(10);
+ let id = Uuid::new_v4();
+ self.all.insert(id, send);
+ (id, recv)
+ }
+
+ pub fn subscribe_chat(&mut self, chat: BareJID) -> (Uuid, Receiver<MacawMessage>) {
+ let (send, recv) = mpsc::channel(10);
+ let id = Uuid::new_v4();
+ if let Some(chat_subscribers) = self.subset.get_mut(&chat) {
+ chat_subscribers.insert(id, send);
+ } else {
+ let hash_map = HashMap::from([(id, send)]);
+ self.subset.insert(chat, hash_map);
+ }
+ (id, recv)
+ }
+
+ pub fn unsubscribe_all(&mut self, sub_id: Uuid) {
+ self.all.remove(&sub_id);
+ }
+
+ pub fn unsubscribe_chat(&mut self, sub_id: Uuid, chat: BareJID) {
+ if let Some(chat_subs) = self.subset.get_mut(&chat) {
+ chat_subs.remove(&sub_id);
+ }
+ }
+}
+