diff options
author | 2025-04-30 01:49:02 +0100 | |
---|---|---|
committer | 2025-04-30 01:49:02 +0100 | |
commit | 4ea506818e85de5453e661552a0fd7dffda38d6e (patch) | |
tree | 0117eb412e7523d71206d7e71297f40dc758a2fa /src/lib.rs | |
parent | 22141aa6d7b0f97441a60111046d1ef7260b68dc (diff) | |
download | macaw-web-4ea506818e85de5453e661552a0fd7dffda38d6e.tar.gz macaw-web-4ea506818e85de5453e661552a0fd7dffda38d6e.tar.bz2 macaw-web-4ea506818e85de5453e661552a0fd7dffda38d6e.zip |
feat: CHAT LIST UPDATES
Diffstat (limited to '')
-rw-r--r-- | src/lib.rs | 309 |
1 files changed, 224 insertions, 85 deletions
@@ -6,7 +6,7 @@ use std::{ ops::{Deref, DerefMut}, rc::Rc, str::FromStr, - sync::{atomic::AtomicUsize, Arc}, + sync::{atomic::AtomicUsize, Arc, RwLock}, thread::sleep, time::{self, Duration}, }; @@ -30,7 +30,7 @@ use leptos_meta::Stylesheet; use reactive_stores::Store; use stylance::import_style; use thiserror::Error; -use tokio::sync::{mpsc::Receiver, Mutex}; +use tokio::sync::{mpsc::{self, Receiver}, Mutex}; use tracing::debug; use uuid::Uuid; @@ -224,6 +224,78 @@ fn LoginPage(set_app: WriteSignal<AppState>, set_client: RwSignal<Option<(Client } } +pub struct MessageSubscriptions { + all: Vec<mpsc::Sender<(JID, MacawMessage)>>, + subset: HashMap<JID, Vec<mpsc::Sender<MacawMessage>>>, +} + +impl MessageSubscriptions { + pub fn new() -> Self { + Self { + all: Vec::new(), + subset: HashMap::new(), + } + } + + pub async fn broadcast( + &mut self, + to: JID, + message: MacawMessage, + ) { + // subscriptions to all + let mut index = 0; + while let Some(sender) = self.all.get(index) { + match sender.send((to.clone(), message.clone())).await { + Ok(_) => { + index += 1; + } + Err(_) => { + self.all.swap_remove(index); + } + } + } + + // subscriptions to specific chat + if let Some(subscribers) = self.subset.get_mut(&to) { + let mut index = 0; + while let Some(sender) = subscribers.get(index) { + match sender.send(message.clone()).await { + Ok(_) => { + index += 1; + } + Err(_) => { + subscribers.swap_remove(index); + } + } + } + if subscribers.is_empty() { + self.subset.remove(&to); + } + } + } + + pub fn subscribe_all( + &mut self, + ) -> Receiver<(JID, MacawMessage)> { + let (send, recv) = mpsc::channel(10); + self.all.push(send); + recv + } + + pub fn subscribe_chat( + &mut self, + chat: JID, + ) -> Receiver<MacawMessage> { + let (send, recv) = mpsc::channel(10); + if let Some(chat_subscribers) = self.subset.get_mut(&chat) { + chat_subscribers.push(send); + } else { + self.subset.insert(chat, vec![send]); + } + recv + } +} + #[component] fn Macaw( // TODO: logout @@ -233,22 +305,15 @@ fn Macaw( ) -> impl IntoView { provide_context(client); - let (new_messages, set_new_messages) = signal(None::<(JID, MacawMessage)>); - provide_context(new_messages); + let message_subscriptions = RwSignal::new(MessageSubscriptions::new()); + provide_context(message_subscriptions); let messages_store: StateStore<Uuid, Store<Message>> = StateStore::new(); - provide_context(RwSignal::new_local(messages_store)); + provide_context(messages_store); let chats_store: StateStore<JID, Store<Chat>> = StateStore::new(); - provide_context(RwSignal::new_local(chats_store)); + provide_context(chats_store); let users_store: StateStore<JID, Store<User>> = StateStore::new(); - provide_context(RwSignal::new_local(users_store)); - - // // here we create a signal in the root that can be consumed - // // anywhere in the app. - // let (count, set_count) = signal(0); - // // we'll pass the setter to specific components, - // // but provide the count itself to the whole app via context - // provide_context(count); + provide_context(users_store); OnceResource::new(async move { while let Some(update) = updates.recv().await { @@ -259,8 +324,11 @@ fn Macaw( UpdateMessage::RosterDelete(jid) => {}, UpdateMessage::Presence { from, presence } => {}, UpdateMessage::Message { to, from, message } => { + // debug!("before got message"); let new_message = MacawMessage::got_message_and_user(message, from); - set_new_messages.set(Some((to, new_message))); + // debug!("after got message"); + spawn_local(async move { message_subscriptions.write_untracked().broadcast(to, new_message).await }); + // debug!("after set message"); }, UpdateMessage::MessageDelivery { id, chat, delivery } => {}, UpdateMessage::SubscriptionRequest(jid) => {}, @@ -274,84 +342,148 @@ fn Macaw( } // V has to be an arc signal -struct StateStore<K, V> { - store: Rc<RefCell<HashMap<K, (V, usize)>>>, +#[derive(Debug)] +struct ArcStateStore<K, V> { + store: Arc<RwLock<HashMap<K, (V, usize)>>>, +} + +impl<K, V> PartialEq for ArcStateStore<K, V> { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.store, &other.store) + } } -impl<K, V> Clone for StateStore<K, V> { +impl<K, V> Clone for ArcStateStore<K, V> { fn clone(&self) -> Self { Self { - store: self.store.clone(), + store: Arc::clone(&self.store) } } } -impl<K, V> StateStore<K, V> { +impl<K, V> Eq for ArcStateStore<K, V> {} + +impl<K, V> ArcStateStore<K, V> { pub fn new() -> Self { Self { - store: Rc::new(RefCell::new(HashMap::new())), + store: Arc::new(RwLock::new(HashMap::new())), } } } -impl<K: Eq + std::hash::Hash + Clone, V: Clone> StateStore<K, V> { - pub fn store(&self, key: K, value: V) -> StateListener<K, V> { - let mut store = self.store.borrow_mut(); - if let Some((v, count)) = store.get_mut(&key) { - *v = value.clone(); - *count += 1; - StateListener { - value, - cleaner: StateCleaner { key, _ty: PhantomData }, - } - } else { - store.insert(key.clone(), (value.clone(), 1)); - StateListener { - value, - cleaner: StateCleaner { - key, - _ty: PhantomData, - } - } +#[derive(Debug)] +struct StateStore<K, V, S = SyncStorage> { + inner: ArenaItem<ArcStateStore<K, V>, S> +} + +impl<K, V, S> Dispose for StateStore<K, V, S> { + fn dispose(self) { + self.inner.dispose() + } +} + +impl<K, V> StateStore<K, V> where K: Send + Sync + 'static, V: Send + Sync + 'static { + pub fn new() -> Self { + Self::new_with_storage() + } +} + +impl<K, V, S> StateStore<K, V, S> where K: 'static, V: 'static, S: Storage<ArcStateStore<K, V>> { + pub fn new_with_storage() -> Self { + Self { + inner: ArenaItem::new_with_storage(ArcStateStore::new()), } } +} - pub fn init() { +impl<K, V> StateStore<K, V, LocalStorage> where K: 'static, V: 'static { + pub fn new_local() -> Self { + Self::new_with_storage() + } +} +impl<K: std::marker::Send + std::marker::Sync + 'static, V: std::marker::Send + std::marker::Sync + 'static> From<ArcStateStore<K, V>> for StateStore<K, V> { + fn from(value: ArcStateStore<K, V>) -> Self { + Self { + inner: ArenaItem::new_with_storage(value), + } } } -impl<K: Eq + std::hash::Hash , V> StateStore<K, V> { +impl<K: 'static, V: 'static> FromLocal<ArcStateStore<K, V>> for StateStore<K, V, LocalStorage> { + fn from_local(value: ArcStateStore<K, V>) -> Self { + Self { + inner: ArenaItem::new_with_storage(value), + } + } +} + +impl<K, V, S> Copy for StateStore<K, V, S> {} + +impl<K, V, S> Clone for StateStore<K, V, S> { + fn clone(&self) -> Self { + *self + } +} + +impl<K: Eq + std::hash::Hash + Clone, V: Clone> StateStore<K, V> where K: Send + Sync + 'static, V: Send + Sync + 'static { + pub fn store(&self, key: K, value: V) -> StateListener<K, V> { + { + let store = self.inner.try_get_value().unwrap(); + let mut store = store.store.write().unwrap(); + debug!("got store"); + debug!("got store 2"); + if let Some((v, count)) = store.get_mut(&key) { + *v = value.clone(); + *count += 1; + } else { + store.insert(key.clone(), (value.clone(), 1)); + } + }; + StateListener { + value, + cleaner: StateCleaner { key, state_store: self.clone() }, + } + } +} + +impl<K, V> StateStore<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static { pub fn update(&self, key: &K, value: V) { - if let Some((v, _)) = self.store.borrow_mut().get_mut(key) { + let store = self.inner.try_get_value().unwrap(); + let mut store = store.store.write().unwrap(); + if let Some((v, _)) = store.get_mut(key) { *v = value; } } pub fn modify(&self, key: &K, modify: impl Fn(&mut V)) { - if let Some((v, _)) = self.store.borrow_mut().get_mut(key) { + let store = self.inner.try_get_value().unwrap(); + let mut store = store.store.write().unwrap(); + if let Some((v, _)) = store.get_mut(key) { modify(v); } } fn remove(&self, key: &K) { - let mut store = self.store.borrow_mut(); + let store = self.inner.try_get_value().unwrap(); + let mut store = store.store.write().unwrap(); if let Some((_v, count)) = store.get_mut(key) { *count -= 1; if *count == 0 { store.remove(key); + debug!("dropped item from store"); } } } } #[derive(Clone)] -struct StateListener<K, V> where K: Eq + std::hash::Hash + 'static, V: 'static { +struct StateListener<K, V> where K: Eq + std::hash::Hash + 'static + std::marker::Send + std::marker::Sync, V: 'static + std::marker::Send + std::marker::Sync { value: V, cleaner: StateCleaner<K, V> } -impl<K: std::cmp::Eq + std::hash::Hash, V> Deref for StateListener<K, V> { +impl<K: std::cmp::Eq + std::hash::Hash + std::marker::Send + std::marker::Sync, V: std::marker::Send + std::marker::Sync> Deref for StateListener<K, V> { type Target = V; fn deref(&self) -> &Self::Target { @@ -359,35 +491,41 @@ impl<K: std::cmp::Eq + std::hash::Hash, V> Deref for StateListener<K, V> { } } -impl<K: std::cmp::Eq + std::hash::Hash, V> DerefMut for StateListener<K, V> { +impl<K: std::cmp::Eq + std::hash::Hash + Send + Sync, V: Send + Sync> DerefMut for StateListener<K, V> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.value } } -struct StateCleaner<K, V> where K: Eq + std::hash::Hash + 'static, V: 'static { +struct ArcStateCleaner<K, V> { + key: K, + state_store: ArcStateStore<K, V>, +} + +struct StateCleaner<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static { key: K, - _ty: PhantomData<V>, - // state_store: StateStore<K, V>, + state_store: StateStore<K, V>, } -impl<K, V> Clone for StateCleaner<K, V> where K: Eq + std::hash::Hash + Clone { +impl<K, V> Clone for StateCleaner<K, V> where K: Eq + std::hash::Hash + Clone + Send + Sync, V: Send + Sync { fn clone(&self) -> Self { - let state_store = use_context::<RwSignal<StateStore<K, V>, LocalStorage>>().unwrap(); - if let Some((_v, count)) = state_store.read_untracked().store.borrow_mut().get_mut(&self.key) { - *count += 1; + { + let store = self.state_store.inner.try_get_value().unwrap(); + let mut store = store.store.write().unwrap(); + if let Some((_v, count)) = store.get_mut(&self.key) { + *count += 1; + } } Self { key: self.key.clone(), - _ty: PhantomData, + state_store: self.state_store.clone(), } } } -impl<K: Eq + std::hash::Hash + 'static, V: 'static> Drop for StateCleaner<K, V> { +impl<K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static> Drop for StateCleaner<K, V> { fn drop(&mut self) { - let state_store = use_context::<RwSignal<StateStore<K, V>, LocalStorage>>().unwrap(); - state_store.read_untracked().remove(&self.key) + self.state_store.remove(&self.key); } } @@ -399,10 +537,10 @@ struct MacawChat { impl MacawChat { fn got_chat_and_user(chat: Chat, user: User) -> Self { - let chat_state_store: RwSignal<StateStore<JID, Store<Chat>>, LocalStorage> = use_context().expect("no chat state store"); - let user_state_store: RwSignal<StateStore<JID, Store<User>>, LocalStorage> = use_context().expect("no user state store"); - let user = user_state_store.read_untracked().store(user.jid.clone(), Store::new(user)); - let chat = chat_state_store.read_untracked().store(chat.correspondent.clone(), Store::new(chat)); + let chat_state_store: StateStore<JID, Store<Chat>> = use_context().expect("no chat state store"); + let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); + let user = user_state_store.store(user.jid.clone(), Store::new(user)); + let chat = chat_state_store.store(chat.correspondent.clone(), Store::new(chat)); Self { chat, user, @@ -432,10 +570,10 @@ struct MacawMessage { impl MacawMessage { fn got_message_and_user(message: Message, user: User) -> Self { - let message_state_store: RwSignal<StateStore<Uuid, Store<Message>>, LocalStorage> = use_context().expect("no message state store"); - let user_state_store: RwSignal<StateStore<JID, Store<User>>, LocalStorage> = use_context().expect("no user state store"); - let message = message_state_store.read_untracked().store(message.id, Store::new(message)); - let user = user_state_store.read_untracked().store(user.jid.clone(), Store::new(user)); + let message_state_store: StateStore<Uuid, Store<Message>> = use_context().expect("no message state store"); + let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); + let message = message_state_store.store(message.id, Store::new(message)); + let user = user_state_store.store(user.jid.clone(), Store::new(user)); Self { message, user, @@ -477,7 +615,6 @@ impl DerefMut for MacawUser { #[component] fn ChatsList() -> impl IntoView { - let client = use_context::<Client>().expect("client not in context"); let (chats, set_chats) = signal(IndexMap::new()); let load_chats = LocalResource::new(move || async move { @@ -497,25 +634,27 @@ fn ChatsList() -> impl IntoView { }); // TODO: filter new messages signal - let new_messages_signal: ReadSignal<Option<(JID, MacawMessage)>> = use_context().unwrap(); - spawn_local(async move { - let mut new_message = new_messages_signal.to_stream(); + let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap(); + let _load_new_messages = LocalResource::new(move || async move { load_chats.await; - while let Some(new_message) = new_message.next().await { + let mut new_messages = new_messages_signal.write().subscribe_all(); + while let Some((to, new_message)) = new_messages.recv().await { debug!("got new message in let"); - if let Some((to, new_message)) = new_message { - if let Some((chat, _latest_message)) = set_chats.write().shift_remove(&to) { - debug!("chat existed"); - set_chats.write().insert_before(0, to, (chat, new_message)); - debug!("done setting"); - } else { - debug!("the chat didn't exist"); - let chat = client.get_chat(to.clone()).await.unwrap(); - let user = client.get_user(to.clone()).await.unwrap(); - let chat = MacawChat::got_chat_and_user(chat, user); - set_chats.write().insert_before(0, to, (chat, new_message)); - debug!("done setting"); - } + let mut chats = set_chats.write(); + if let Some((chat, _latest_message)) = chats.shift_remove(&to) { + debug!("chat existed"); + chats.insert_before(0, to, (chat.clone(), new_message)); + debug!("done setting"); + } else { + debug!("the chat didn't exist"); + let client = use_context::<Client>().expect("client not in context"); + let chat = client.get_chat(to.clone()).await.unwrap(); + let user = client.get_user(to.clone()).await.unwrap(); + debug!("before got chat"); + let chat = MacawChat::got_chat_and_user(chat, user); + debug!("after got chat"); + chats.insert_before(0, to, (chat, new_message)); + debug!("done setting"); } } debug!("set the new message"); |