diff options
| author | 2025-04-30 01:49:02 +0100 | |
|---|---|---|
| committer | 2025-04-30 01:49:02 +0100 | |
| commit | 4ea506818e85de5453e661552a0fd7dffda38d6e (patch) | |
| tree | 0117eb412e7523d71206d7e71297f40dc758a2fa | |
| parent | 22141aa6d7b0f97441a60111046d1ef7260b68dc (diff) | |
| download | macaw-web-4ea506818e85de5453e661552a0fd7dffda38d6e.tar.gz macaw-web-4ea506818e85de5453e661552a0fd7dffda38d6e.tar.bz2 macaw-web-4ea506818e85de5453e661552a0fd7dffda38d6e.zip  | |
feat: CHAT LIST UPDATES
| -rw-r--r-- | src/lib.rs | 309 | 
1 files changed, 224 insertions, 85 deletions
@@ -6,7 +6,7 @@ use std::{      ops::{Deref, DerefMut},      rc::Rc,      str::FromStr, -    sync::{atomic::AtomicUsize, Arc}, +    sync::{atomic::AtomicUsize, Arc, RwLock},      thread::sleep,      time::{self, Duration},  }; @@ -30,7 +30,7 @@ use leptos_meta::Stylesheet;  use reactive_stores::Store;  use stylance::import_style;  use thiserror::Error; -use tokio::sync::{mpsc::Receiver, Mutex}; +use tokio::sync::{mpsc::{self, Receiver}, Mutex};  use tracing::debug;  use uuid::Uuid; @@ -224,6 +224,78 @@ fn LoginPage(set_app: WriteSignal<AppState>, set_client: RwSignal<Option<(Client      }  } +pub struct MessageSubscriptions { +    all: Vec<mpsc::Sender<(JID, MacawMessage)>>, +    subset: HashMap<JID, Vec<mpsc::Sender<MacawMessage>>>, +} + +impl MessageSubscriptions { +    pub fn new() -> Self { +        Self { +            all: Vec::new(), +            subset: HashMap::new(), +        } +    } + +    pub async fn broadcast( +        &mut self, +        to: JID, +        message: MacawMessage, +    ) { +        // subscriptions to all +        let mut index = 0; +        while let Some(sender) = self.all.get(index) { +            match sender.send((to.clone(), message.clone())).await { +                Ok(_) => { +                    index += 1; +                } +                Err(_) => { +                    self.all.swap_remove(index); +                } +            } +        } + +        // subscriptions to specific chat +        if let Some(subscribers) = self.subset.get_mut(&to) { +            let mut index = 0; +            while let Some(sender) = subscribers.get(index) { +                match sender.send(message.clone()).await { +                    Ok(_) => { +                        index += 1; +                    } +                    Err(_) => { +                        subscribers.swap_remove(index); +                    } +                } +            } +            if subscribers.is_empty() { +                self.subset.remove(&to); +            } +        } +    } + +    pub fn subscribe_all( +        &mut self, +    ) -> Receiver<(JID, MacawMessage)> { +        let (send, recv) = mpsc::channel(10); +        self.all.push(send); +        recv +    } + +    pub fn subscribe_chat( +        &mut self, +        chat: JID, +    ) -> Receiver<MacawMessage> { +        let (send, recv) = mpsc::channel(10); +        if let Some(chat_subscribers) = self.subset.get_mut(&chat) { +            chat_subscribers.push(send); +        } else { +            self.subset.insert(chat, vec![send]); +        } +        recv +    } +} +  #[component]  fn Macaw(      // TODO: logout @@ -233,22 +305,15 @@ fn Macaw(  ) -> impl IntoView {      provide_context(client); -    let (new_messages, set_new_messages) = signal(None::<(JID, MacawMessage)>); -    provide_context(new_messages); +    let message_subscriptions = RwSignal::new(MessageSubscriptions::new()); +    provide_context(message_subscriptions);      let messages_store: StateStore<Uuid, Store<Message>> = StateStore::new(); -    provide_context(RwSignal::new_local(messages_store)); +    provide_context(messages_store);      let chats_store: StateStore<JID, Store<Chat>> = StateStore::new(); -    provide_context(RwSignal::new_local(chats_store)); +    provide_context(chats_store);      let users_store: StateStore<JID, Store<User>> = StateStore::new(); -    provide_context(RwSignal::new_local(users_store)); - -    // // here we create a signal in the root that can be consumed -    // // anywhere in the app. -    // let (count, set_count) = signal(0); -    // // we'll pass the setter to specific components, -    // // but provide the count itself to the whole app via context -    // provide_context(count); +    provide_context(users_store);      OnceResource::new(async move {          while let Some(update) = updates.recv().await { @@ -259,8 +324,11 @@ fn Macaw(                  UpdateMessage::RosterDelete(jid) => {},                  UpdateMessage::Presence { from, presence } => {},                  UpdateMessage::Message { to, from, message } => { +                    // debug!("before got message");                      let new_message = MacawMessage::got_message_and_user(message, from); -                    set_new_messages.set(Some((to, new_message))); +                    // debug!("after got message"); +                    spawn_local(async move { message_subscriptions.write_untracked().broadcast(to, new_message).await }); +                    // debug!("after set message");                  },                  UpdateMessage::MessageDelivery { id, chat, delivery } => {},                  UpdateMessage::SubscriptionRequest(jid) => {}, @@ -274,84 +342,148 @@ fn Macaw(  }  // V has to be an arc signal -struct StateStore<K, V> { -    store: Rc<RefCell<HashMap<K, (V, usize)>>>, +#[derive(Debug)] +struct ArcStateStore<K, V> { +    store: Arc<RwLock<HashMap<K, (V, usize)>>>, +} + +impl<K, V> PartialEq for ArcStateStore<K, V> { +    fn eq(&self, other: &Self) -> bool { +        Arc::ptr_eq(&self.store, &other.store) +    }  } -impl<K, V> Clone for StateStore<K, V> { +impl<K, V> Clone for ArcStateStore<K, V> {      fn clone(&self) -> Self {          Self { -            store: self.store.clone(), +            store: Arc::clone(&self.store)          }      }  } -impl<K, V> StateStore<K, V> { +impl<K, V> Eq for ArcStateStore<K, V> {} + +impl<K, V> ArcStateStore<K, V> {      pub fn new() -> Self {          Self { -            store: Rc::new(RefCell::new(HashMap::new())), +            store: Arc::new(RwLock::new(HashMap::new())),          }      }  } -impl<K: Eq + std::hash::Hash + Clone, V: Clone> StateStore<K, V> { -    pub fn store(&self, key: K, value: V) -> StateListener<K, V> { -        let mut store = self.store.borrow_mut(); -        if let Some((v, count)) = store.get_mut(&key) { -            *v = value.clone(); -            *count += 1; -            StateListener { -                value, -                cleaner: StateCleaner { key, _ty: PhantomData  }, -            } -        } else { -            store.insert(key.clone(), (value.clone(), 1)); -            StateListener { -                value, -                cleaner: StateCleaner { -                    key, -                    _ty: PhantomData, -                } -            } +#[derive(Debug)] +struct StateStore<K, V, S = SyncStorage> { +    inner: ArenaItem<ArcStateStore<K, V>, S> +} + +impl<K, V, S> Dispose for StateStore<K, V, S> { +    fn dispose(self) { +        self.inner.dispose() +    } +} + +impl<K, V> StateStore<K, V> where K: Send + Sync + 'static, V: Send + Sync + 'static { +    pub fn new() -> Self { +        Self::new_with_storage() +    } +} + +impl<K, V, S> StateStore<K, V, S> where K: 'static, V: 'static, S: Storage<ArcStateStore<K, V>> { +    pub fn new_with_storage() -> Self { +        Self { +            inner: ArenaItem::new_with_storage(ArcStateStore::new()),          }      } +} -    pub fn init() { +impl<K, V> StateStore<K, V, LocalStorage> where K: 'static, V: 'static { +    pub fn new_local() -> Self { +        Self::new_with_storage() +    } +} +impl<K: std::marker::Send + std::marker::Sync + 'static, V: std::marker::Send + std::marker::Sync + 'static> From<ArcStateStore<K, V>> for StateStore<K, V> { +    fn from(value: ArcStateStore<K, V>) -> Self { +        Self { +            inner: ArenaItem::new_with_storage(value), +        }      }  } -impl<K: Eq + std::hash::Hash , V> StateStore<K, V> { +impl<K: 'static, V: 'static> FromLocal<ArcStateStore<K, V>> for StateStore<K, V, LocalStorage> { +    fn from_local(value: ArcStateStore<K, V>) -> Self { +        Self { +            inner: ArenaItem::new_with_storage(value), +        } +    } +} + +impl<K, V, S> Copy for StateStore<K, V, S> {} + +impl<K, V, S> Clone for StateStore<K, V, S> { +    fn clone(&self) -> Self { +        *self +    } +} + +impl<K: Eq + std::hash::Hash + Clone, V: Clone> StateStore<K, V> where K: Send + Sync + 'static, V: Send + Sync + 'static { +    pub fn store(&self, key: K, value: V) -> StateListener<K, V> { +        { +            let store = self.inner.try_get_value().unwrap(); +            let mut store = store.store.write().unwrap(); +            debug!("got store"); +            debug!("got store 2"); +            if let Some((v, count)) = store.get_mut(&key) { +                *v = value.clone(); +                *count += 1; +            } else { +                store.insert(key.clone(), (value.clone(), 1)); +            } +        }; +        StateListener { +            value, +            cleaner: StateCleaner { key, state_store: self.clone() }, +        } +    } +} + +impl<K, V> StateStore<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static {      pub fn update(&self, key: &K, value: V) { -        if let Some((v, _)) = self.store.borrow_mut().get_mut(key) { +        let store = self.inner.try_get_value().unwrap(); +        let mut store = store.store.write().unwrap(); +        if let Some((v, _)) = store.get_mut(key) {              *v = value;          }      }      pub fn modify(&self, key: &K, modify: impl Fn(&mut V)) { -        if let Some((v, _)) = self.store.borrow_mut().get_mut(key) { +        let store = self.inner.try_get_value().unwrap(); +        let mut store = store.store.write().unwrap(); +        if let Some((v, _)) = store.get_mut(key) {              modify(v);          }      }      fn remove(&self, key: &K) { -        let mut store = self.store.borrow_mut(); +        let store = self.inner.try_get_value().unwrap(); +        let mut store = store.store.write().unwrap();          if let Some((_v, count)) = store.get_mut(key) {              *count -= 1;              if *count == 0 {                  store.remove(key); +                debug!("dropped item from store");              }          }      }  }  #[derive(Clone)] -struct StateListener<K, V> where K: Eq + std::hash::Hash + 'static, V: 'static { +struct StateListener<K, V> where K: Eq + std::hash::Hash + 'static + std::marker::Send + std::marker::Sync, V: 'static + std::marker::Send + std::marker::Sync {      value: V,      cleaner: StateCleaner<K, V>  } -impl<K: std::cmp::Eq + std::hash::Hash, V> Deref for StateListener<K, V> { +impl<K: std::cmp::Eq + std::hash::Hash + std::marker::Send + std::marker::Sync, V: std::marker::Send + std::marker::Sync> Deref for StateListener<K, V> {      type Target = V;      fn deref(&self) -> &Self::Target { @@ -359,35 +491,41 @@ impl<K: std::cmp::Eq + std::hash::Hash, V> Deref for StateListener<K, V> {      }  } -impl<K: std::cmp::Eq + std::hash::Hash, V> DerefMut for StateListener<K, V> { +impl<K: std::cmp::Eq + std::hash::Hash + Send + Sync, V: Send + Sync> DerefMut for StateListener<K, V> {      fn deref_mut(&mut self) -> &mut Self::Target {          &mut self.value      }  } -struct StateCleaner<K, V> where K: Eq + std::hash::Hash + 'static, V: 'static { +struct ArcStateCleaner<K, V> { +    key: K, +    state_store: ArcStateStore<K, V>, +} + +struct StateCleaner<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static {      key: K, -    _ty: PhantomData<V>, -    // state_store: StateStore<K, V>, +    state_store: StateStore<K, V>,  } -impl<K, V> Clone for StateCleaner<K, V> where K: Eq + std::hash::Hash + Clone { +impl<K, V> Clone for StateCleaner<K, V> where K: Eq + std::hash::Hash + Clone + Send + Sync, V: Send + Sync {      fn clone(&self) -> Self { -        let state_store  = use_context::<RwSignal<StateStore<K, V>, LocalStorage>>().unwrap(); -        if let Some((_v, count)) = state_store.read_untracked().store.borrow_mut().get_mut(&self.key) { -            *count += 1; +        { +            let store = self.state_store.inner.try_get_value().unwrap(); +            let mut store = store.store.write().unwrap(); +            if let Some((_v, count)) = store.get_mut(&self.key) { +                *count += 1; +            }          }          Self {              key: self.key.clone(), -            _ty: PhantomData, +            state_store: self.state_store.clone(),          }      }  } -impl<K: Eq + std::hash::Hash + 'static, V: 'static> Drop for StateCleaner<K, V> { +impl<K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static> Drop for StateCleaner<K, V> {      fn drop(&mut self) { -        let state_store = use_context::<RwSignal<StateStore<K, V>, LocalStorage>>().unwrap(); -        state_store.read_untracked().remove(&self.key) +        self.state_store.remove(&self.key);      }  } @@ -399,10 +537,10 @@ struct MacawChat {  impl MacawChat {      fn got_chat_and_user(chat: Chat, user: User) -> Self { -        let chat_state_store: RwSignal<StateStore<JID, Store<Chat>>, LocalStorage> = use_context().expect("no chat state store"); -        let user_state_store: RwSignal<StateStore<JID, Store<User>>, LocalStorage> = use_context().expect("no user state store"); -        let user = user_state_store.read_untracked().store(user.jid.clone(), Store::new(user)); -        let chat = chat_state_store.read_untracked().store(chat.correspondent.clone(), Store::new(chat)); +        let chat_state_store: StateStore<JID, Store<Chat>> = use_context().expect("no chat state store"); +        let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); +        let user = user_state_store.store(user.jid.clone(), Store::new(user)); +        let chat = chat_state_store.store(chat.correspondent.clone(), Store::new(chat));          Self {              chat,              user, @@ -432,10 +570,10 @@ struct MacawMessage {  impl MacawMessage {      fn got_message_and_user(message: Message, user: User) -> Self { -        let message_state_store: RwSignal<StateStore<Uuid, Store<Message>>, LocalStorage> = use_context().expect("no message state store"); -        let user_state_store: RwSignal<StateStore<JID, Store<User>>, LocalStorage> = use_context().expect("no user state store"); -        let message = message_state_store.read_untracked().store(message.id, Store::new(message)); -        let user = user_state_store.read_untracked().store(user.jid.clone(), Store::new(user)); +        let message_state_store: StateStore<Uuid, Store<Message>> = use_context().expect("no message state store"); +        let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); +        let message = message_state_store.store(message.id, Store::new(message)); +        let user = user_state_store.store(user.jid.clone(), Store::new(user));          Self {              message,              user, @@ -477,7 +615,6 @@ impl DerefMut for MacawUser {  #[component]  fn ChatsList() -> impl IntoView { -    let client = use_context::<Client>().expect("client not in context");      let (chats, set_chats) = signal(IndexMap::new());      let load_chats = LocalResource::new(move || async move { @@ -497,25 +634,27 @@ fn ChatsList() -> impl IntoView {      });      // TODO: filter new messages signal -    let new_messages_signal: ReadSignal<Option<(JID, MacawMessage)>> = use_context().unwrap(); -    spawn_local(async move { -        let mut new_message = new_messages_signal.to_stream(); +    let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap(); +    let _load_new_messages = LocalResource::new(move || async move {          load_chats.await; -        while let Some(new_message) = new_message.next().await { +        let mut new_messages = new_messages_signal.write().subscribe_all(); +        while let Some((to, new_message)) = new_messages.recv().await {              debug!("got new message in let"); -            if let Some((to, new_message)) = new_message { -                if let Some((chat, _latest_message)) = set_chats.write().shift_remove(&to) { -                    debug!("chat existed"); -                    set_chats.write().insert_before(0, to, (chat, new_message)); -                    debug!("done setting"); -                } else { -                    debug!("the chat didn't exist"); -                    let chat = client.get_chat(to.clone()).await.unwrap(); -                    let user = client.get_user(to.clone()).await.unwrap(); -                    let chat = MacawChat::got_chat_and_user(chat, user); -                    set_chats.write().insert_before(0, to, (chat, new_message)); -                    debug!("done setting"); -                } +            let mut chats = set_chats.write(); +            if let Some((chat, _latest_message)) = chats.shift_remove(&to) { +                debug!("chat existed"); +                chats.insert_before(0, to, (chat.clone(), new_message)); +                debug!("done setting"); +            } else { +                debug!("the chat didn't exist"); +                let client = use_context::<Client>().expect("client not in context"); +                let chat = client.get_chat(to.clone()).await.unwrap(); +                let user = client.get_user(to.clone()).await.unwrap(); +                debug!("before got chat"); +                let chat = MacawChat::got_chat_and_user(chat, user); +                debug!("after got chat"); +                chats.insert_before(0, to, (chat, new_message)); +                debug!("done setting");              }          }          debug!("set the new message");  | 
