diff options
author | 2025-05-07 04:30:34 +0100 | |
---|---|---|
committer | 2025-05-07 04:30:34 +0100 | |
commit | 01bf0c11eaea7c3d8b12712143569b3931a5ae33 (patch) | |
tree | 6bd5a0255840414cef5c93b0a2f71dffb2f2b41f | |
parent | 3302bdedebb734d9b7306bb361c2aa9168a527d7 (diff) | |
download | macaw-web-01bf0c11eaea7c3d8b12712143569b3931a5ae33.tar.gz macaw-web-01bf0c11eaea7c3d8b12712143569b3931a5ae33.tar.bz2 macaw-web-01bf0c11eaea7c3d8b12712143569b3931a5ae33.zip |
fix: use `ArcStore` in `StateStore`s
Diffstat (limited to '')
-rw-r--r-- | src/lib.rs | 181 |
1 files changed, 106 insertions, 75 deletions
@@ -24,7 +24,7 @@ use leptos::{ }; use leptos_meta::Stylesheet; use leptos_use::{use_textarea_autosize, UseTextareaAutosizeReturn}; -use reactive_stores::{Store, StoreField}; +use reactive_stores::{ArcStore, Store, StoreField}; use stylance::import_style; use thiserror::Error; use tokio::sync::{mpsc::{self, Receiver}, Mutex}; @@ -224,14 +224,14 @@ fn LoginPage(set_app: WriteSignal<AppState>, set_client: RwSignal<Option<(Client } pub struct MessageSubscriptions { - all: Vec<mpsc::Sender<(JID, MacawMessage)>>, - subset: HashMap<JID, Vec<mpsc::Sender<MacawMessage>>>, + all: HashMap<Uuid, mpsc::Sender<(JID, MacawMessage)>>, + subset: HashMap<JID, HashMap<Uuid, mpsc::Sender<MacawMessage>>>, } impl MessageSubscriptions { pub fn new() -> Self { Self { - all: Vec::new(), + all: HashMap::new(), subset: HashMap::new(), } } @@ -242,31 +242,35 @@ impl MessageSubscriptions { message: MacawMessage, ) { // subscriptions to all - let mut index = 0; - while let Some(sender) = self.all.get(index) { + let mut removals = Vec::new(); + for (id, sender) in &self.all { match sender.send((to.clone(), message.clone())).await { Ok(_) => { - index += 1; } Err(_) => { - self.all.swap_remove(index); + removals.push(*id); } } } + for removal in removals { + self.all.remove(&removal); + } // subscriptions to specific chat if let Some(subscribers) = self.subset.get_mut(&to) { - let mut index = 0; - while let Some(sender) = subscribers.get(index) { + let mut removals = Vec::new(); + for (id, sender) in &*subscribers { match sender.send(message.clone()).await { Ok(_) => { - index += 1; } Err(_) => { - subscribers.swap_remove(index); + removals.push(*id); } } } + for removal in removals { + subscribers.remove(&removal); + } if subscribers.is_empty() { self.subset.remove(&to); } @@ -275,23 +279,36 @@ impl MessageSubscriptions { pub fn subscribe_all( &mut self, - ) -> Receiver<(JID, MacawMessage)> { + ) -> (Uuid, Receiver<(JID, MacawMessage)>) { let (send, recv) = mpsc::channel(10); - self.all.push(send); - recv + let id = Uuid::new_v4(); + self.all.insert(id, send); + (id, recv) } pub fn subscribe_chat( &mut self, chat: JID, - ) -> Receiver<MacawMessage> { + ) -> (Uuid, Receiver<MacawMessage>) { let (send, recv) = mpsc::channel(10); + let id = Uuid::new_v4(); if let Some(chat_subscribers) = self.subset.get_mut(&chat) { - chat_subscribers.push(send); + chat_subscribers.insert(id, send); } else { - self.subset.insert(chat, vec![send]); + let hash_map = HashMap::from([(id, send)]); + self.subset.insert(chat, hash_map); + } + (id, recv) + } + + pub fn unsubscribe_all(&mut self, sub_id: Uuid) { + self.all.remove(&sub_id); + } + + pub fn unsubscribe_chat(&mut self, sub_id: Uuid, chat: JID) { + if let Some(chat_subs) = self.subset.get_mut(&chat) { + chat_subs.remove(&sub_id); } - recv } } @@ -325,16 +342,16 @@ pub struct OpenChatsPanel { pub fn open_chat(open_chats: Store<OpenChatsPanel>, chat: MacawChat) { if let Some(jid) = &*open_chats.chat_view().read() { if let Some((index, _jid, entry)) = open_chats.chats().write().shift_remove_full(jid) { - let new_jid = chat.chat.correspondent().read().clone(); + let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone(); open_chats.chats().write().insert_before(index, new_jid.clone(), chat); *open_chats.chat_view().write() = Some(new_jid); } else { - let new_jid = chat.chat.correspondent().read().clone(); + let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone(); open_chats.chats().write().insert(new_jid.clone(), chat); *open_chats.chat_view().write() = Some(new_jid); } } else { - let new_jid = chat.chat.correspondent().read().clone(); + let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone(); open_chats.chats().write().insert(new_jid.clone(), chat); *open_chats.chat_view().write() = Some(new_jid); } @@ -345,16 +362,16 @@ impl OpenChatsPanel { if let Some(jid) = &mut self.chat_view { debug!("a chat was already open"); if let Some((index, _jid, entry)) = self.chats.shift_remove_full(jid) { - let new_jid = chat.chat.correspondent().read().clone(); + let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone(); self.chats.insert_before(index, new_jid.clone(), chat); *&mut self.chat_view = Some(new_jid); } else { - let new_jid = chat.chat.correspondent().read().clone(); + let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone(); self.chats.insert(new_jid.clone(), chat); *&mut self.chat_view = Some(new_jid); } } else { - let new_jid = chat.chat.correspondent().read().clone(); + let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone(); self.chats.insert(new_jid.clone(), chat); *&mut self.chat_view = Some(new_jid); } @@ -386,11 +403,11 @@ fn Macaw( let message_subscriptions = RwSignal::new(MessageSubscriptions::new()); provide_context(message_subscriptions); - let messages_store: StateStore<Uuid, Store<Message>> = StateStore::new(); + let messages_store: StateStore<Uuid, ArcStore<Message>> = StateStore::new(); provide_context(messages_store); - let chats_store: StateStore<JID, Store<Chat>> = StateStore::new(); + let chats_store: StateStore<JID, ArcStore<Chat>> = StateStore::new(); provide_context(chats_store); - let users_store: StateStore<JID, Store<User>> = StateStore::new(); + let users_store: StateStore<JID, ArcStore<User>> = StateStore::new(); provide_context(users_store); let open_chats = Store::new(OpenChatsPanel::default()); @@ -426,14 +443,14 @@ fn Macaw( }, UpdateMessage::Presence { from, presence } => {}, UpdateMessage::Message { to, from, message } => { - // debug!("before got message"); + debug!("before got message"); let new_message = MacawMessage::got_message_and_user(message, from); - // debug!("after got message"); + debug!("after got message"); spawn_local(async move { message_subscriptions.write_untracked().broadcast(to, new_message).await }); - // debug!("after set message"); + debug!("after set message"); }, UpdateMessage::MessageDelivery { id, chat, delivery } => { - messages_store.modify(&id, |message| message.delivery().set(Some(delivery))); + messages_store.modify(&id, |message| <ArcStore<filamento::chat::Message> as Clone>::clone(&message).delivery().set(Some(delivery))); }, UpdateMessage::SubscriptionRequest(jid) => {}, UpdateMessage::NickChanged { jid, nick } => { @@ -493,7 +510,7 @@ pub fn OpenChatsPanelView() -> impl IntoView { #[component] pub fn OpenChatView(chat: MacawChat) -> impl IntoView { - let chat_chat = *chat.chat; + let chat_chat: Store<Chat> = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).into(); let chat_jid = move || chat_chat.correspondent().get(); view! { @@ -507,7 +524,7 @@ pub fn OpenChatView(chat: MacawChat) -> impl IntoView { #[component] pub fn ChatViewHeader(chat: MacawChat) -> impl IntoView { - let chat_user = *chat.user; + let chat_user = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into(); let avatar = LocalResource::new(move || get_avatar(chat_user)); let name = move || get_name(chat_user); let jid = move || chat_user.jid().read().to_string(); @@ -528,8 +545,8 @@ pub fn ChatViewHeader(chat: MacawChat) -> impl IntoView { #[component] pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView { let (messages, set_messages) = signal(IndexMap::new()); - let chat_chat = *chat.chat; - let chat_user = *chat.user; + let chat_chat: Store<Chat> = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).into(); + let chat_user: Store<User> = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into(); let load_messages = LocalResource::new(move || async move { let client = use_context::<Client>().expect("client not in context"); @@ -538,7 +555,7 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView { Ok(m) => { let messages = m.into_iter().map(|(message, message_user)| { (message.id, MacawMessage::got_message_and_user(message, message_user)) - }).rev().collect::<IndexMap<Uuid, _>>(); + }).collect::<IndexMap<Uuid, _>>(); set_messages.set(messages); }, Err(_) => { @@ -549,20 +566,22 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView { // TODO: filter new messages signal let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap(); + let (sub_id, set_sub_id) = signal(None); let _load_new_messages = LocalResource::new(move || async move { load_messages.await; - let mut new_messages = new_messages_signal.write().subscribe_chat(chat_chat.correspondent().get_untracked()); + let (sub_id, mut new_messages) = new_messages_signal.write().subscribe_chat(chat_chat.correspondent().get_untracked()); + set_sub_id.set(Some(sub_id)); while let Some(new_message) = new_messages.recv().await { debug!("got new message in let message buffer"); let mut messages= set_messages.write(); if let Some((_, last)) = messages.last() { - if *last.message.timestamp().read_untracked() < *new_message.timestamp().read_untracked() { + if *<ArcStore<filamento::chat::Message> as Clone>::clone(&last.message).timestamp().read_untracked() < *<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message).timestamp().read_untracked() { messages - .insert(new_message.message.id().get_untracked(), new_message); + .insert(<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(), new_message); debug!("set the new message in message buffer"); } else { let index = match messages.binary_search_by(|_, value| { - value.message.timestamp().read_untracked().cmp(&new_message.message.timestamp().read_untracked()) + <ArcStore<filamento::chat::Message> as Clone>::clone(&value.message).timestamp().read_untracked().cmp(&<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).timestamp().read_untracked()) }) { Ok(i) => i, Err(i) => i, @@ -570,24 +589,29 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView { messages.insert_before( // TODO: check if this logic is correct index, - new_message.message.id().get_untracked(), + <ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(), new_message, ); debug!("set the new message in message buffer"); } } else { messages - .insert(new_message.message.id().get_untracked(), new_message); + .insert(<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(), new_message); debug!("set the new message in message buffer"); } } }); + on_cleanup(move || { + if let Some(sub_id) = sub_id.get() { + new_messages_signal.write().unsubscribe_chat(sub_id, chat_chat.correspondent().get()); + } + }); let each = move || { let mut last_timestamp = NaiveDateTime::MIN; let mut last_user: Option<JID> = None; let mut messages = messages.get().into_iter().map(|(id, message)| { - let message_timestamp = message.message.timestamp().read().naive_local(); + let message_timestamp = <ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).timestamp().read().naive_local(); // if message_timestamp.date() > last_timestamp.date() { // messages_view = messages_view.push(date(message_timestamp.date())); // } @@ -598,7 +622,7 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView { } else { false }; - last_user = Some(message.message.from().get()); + last_user = Some(<ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).from().get()); last_timestamp = message_timestamp; (id, (message, major, false)) }).collect::<Vec<_>>(); @@ -716,8 +740,8 @@ pub fn Delivery(delivery: Delivery) -> impl IntoView { #[component] pub fn Message(message: MacawMessage, major: bool, r#final: bool) -> impl IntoView { - let message_message = *message.message; - let message_user = *message.user; + let message_message: Store<Message> = <ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).into(); + let message_user = <ArcStore<filamento::user::User> as Clone>::clone(&message.user).into(); let avatar = LocalResource::new(move || get_avatar(message_user)); let name = move || get_name(message_user); @@ -954,15 +978,15 @@ impl<K, V> StateStore<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'stati } fn remove(&self, key: &K) { - let store = self.inner.try_get_value().unwrap(); - let mut store = store.store.write().unwrap(); - if let Some((_v, count)) = store.get_mut(key) { - *count -= 1; - if *count == 0 { - store.remove(key); - debug!("dropped item from store"); - } - } + // let store = self.inner.try_get_value().unwrap(); + // let mut store = store.store.write().unwrap(); + // if let Some((_v, count)) = store.get_mut(key) { + // *count -= 1; + // if *count == 0 { + // store.remove(key); + // debug!("dropped item from store"); + // } + // } } } @@ -1020,16 +1044,16 @@ impl<K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static> #[derive(Clone)] struct MacawChat { - chat: StateListener<JID, Store<Chat>>, - user: StateListener<JID, Store<User>>, + chat: StateListener<JID, ArcStore<Chat>>, + user: StateListener<JID, ArcStore<User>>, } impl MacawChat { fn got_chat_and_user(chat: Chat, user: User) -> Self { - let chat_state_store: StateStore<JID, Store<Chat>> = use_context().expect("no chat state store"); - let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); - let user = user_state_store.store(user.jid.clone(), Store::new(user)); - let chat = chat_state_store.store(chat.correspondent.clone(), Store::new(chat)); + let chat_state_store: StateStore<JID, ArcStore<Chat>> = use_context().expect("no chat state store"); + let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store"); + let user = user_state_store.store(user.jid.clone(), ArcStore::new(user)); + let chat = chat_state_store.store(chat.correspondent.clone(), ArcStore::new(chat)); Self { chat, user, @@ -1038,7 +1062,7 @@ impl MacawChat { } impl Deref for MacawChat { - type Target = StateListener<JID, Store<Chat>>; + type Target = StateListener<JID, ArcStore<Chat>>; fn deref(&self) -> &Self::Target { &self.chat @@ -1053,16 +1077,16 @@ impl DerefMut for MacawChat { #[derive(Clone)] struct MacawMessage { - message: StateListener<Uuid, Store<Message>>, - user: StateListener<JID, Store<User>>, + message: StateListener<Uuid, ArcStore<Message>>, + user: StateListener<JID, ArcStore<User>>, } impl MacawMessage { fn got_message_and_user(message: Message, user: User) -> Self { - let message_state_store: StateStore<Uuid, Store<Message>> = use_context().expect("no message state store"); - let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); - let message = message_state_store.store(message.id, Store::new(message)); - let user = user_state_store.store(user.jid.clone(), Store::new(user)); + let message_state_store: StateStore<Uuid, ArcStore<Message>> = use_context().expect("no message state store"); + let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store"); + let message = message_state_store.store(message.id, ArcStore::new(message)); + let user = user_state_store.store(user.jid.clone(), ArcStore::new(user)); Self { message, user, @@ -1071,7 +1095,7 @@ impl MacawMessage { } impl Deref for MacawMessage { - type Target = StateListener<Uuid, Store<Message>>; + type Target = StateListener<Uuid, ArcStore<Message>>; fn deref(&self) -> &Self::Target { &self.message @@ -1104,14 +1128,14 @@ impl DerefMut for MacawUser { struct MacawContact { contact: Store<Contact>, - user: StateListener<JID, Store<User>>, + user: StateListener<JID, ArcStore<User>>, } impl MacawContact { fn got_contact_and_user(contact: Contact, user: User) -> Self { let contact = Store::new(contact); - let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); - let user = user_state_store.store(user.jid.clone(), Store::new(user)); + let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store"); + let user = user_state_store.store(user.jid.clone(), ArcStore::new(user)); Self { contact, user, @@ -1155,9 +1179,11 @@ fn ChatsList() -> impl IntoView { // TODO: filter new messages signal let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap(); + let (sub_id, set_sub_id) = signal(None); let _load_new_messages = LocalResource::new(move || async move { load_chats.await; - let mut new_messages = new_messages_signal.write().subscribe_all(); + let (sub_id, mut new_messages) = new_messages_signal.write().subscribe_all(); + set_sub_id.set(Some(sub_id)); while let Some((to, new_message)) = new_messages.recv().await { debug!("got new message in let"); let mut chats = set_chats.write(); @@ -1181,6 +1207,11 @@ fn ChatsList() -> impl IntoView { } debug!("set the new message"); }); + on_cleanup(move || { + if let Some(sub_id) = sub_id.get() { + new_messages_signal.write().unsubscribe_all(sub_id); + } + }); view! { <div class="chats-list panel"> @@ -1229,7 +1260,7 @@ pub fn get_name(user: Store<User>) -> String { #[component] fn ChatsListItem(chat: MacawChat, message: MacawMessage) -> impl IntoView { - let chat_user = *chat.user; + let chat_user: Store<User> = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into(); let avatar = LocalResource::new(move || get_avatar(chat_user)); let name = move || get_name(chat_user); |