diff options
| -rw-r--r-- | src/lib.rs | 181 | 
1 files changed, 106 insertions, 75 deletions
@@ -24,7 +24,7 @@ use leptos::{  };  use leptos_meta::Stylesheet;  use leptos_use::{use_textarea_autosize, UseTextareaAutosizeReturn}; -use reactive_stores::{Store, StoreField}; +use reactive_stores::{ArcStore, Store, StoreField};  use stylance::import_style;  use thiserror::Error;  use tokio::sync::{mpsc::{self, Receiver}, Mutex}; @@ -224,14 +224,14 @@ fn LoginPage(set_app: WriteSignal<AppState>, set_client: RwSignal<Option<(Client  }  pub struct MessageSubscriptions { -    all: Vec<mpsc::Sender<(JID, MacawMessage)>>, -    subset: HashMap<JID, Vec<mpsc::Sender<MacawMessage>>>, +    all: HashMap<Uuid, mpsc::Sender<(JID, MacawMessage)>>, +    subset: HashMap<JID, HashMap<Uuid, mpsc::Sender<MacawMessage>>>,  }  impl MessageSubscriptions {      pub fn new() -> Self {          Self { -            all: Vec::new(), +            all: HashMap::new(),              subset: HashMap::new(),          }      } @@ -242,31 +242,35 @@ impl MessageSubscriptions {          message: MacawMessage,      ) {          // subscriptions to all -        let mut index = 0; -        while let Some(sender) = self.all.get(index) { +        let mut removals = Vec::new(); +        for (id, sender) in &self.all {              match sender.send((to.clone(), message.clone())).await {                  Ok(_) => { -                    index += 1;                  }                  Err(_) => { -                    self.all.swap_remove(index); +                    removals.push(*id);                  }              }          } +        for removal in removals { +            self.all.remove(&removal); +        }          // subscriptions to specific chat          if let Some(subscribers) = self.subset.get_mut(&to) { -            let mut index = 0; -            while let Some(sender) = subscribers.get(index) { +            let mut removals = Vec::new(); +            for (id, sender) in &*subscribers {                  match sender.send(message.clone()).await {                      Ok(_) => { -                        index += 1;                      }                      Err(_) => { -                        subscribers.swap_remove(index); +                        removals.push(*id);                      }                  }              } +            for removal in removals { +                subscribers.remove(&removal); +            }              if subscribers.is_empty() {                  self.subset.remove(&to);              } @@ -275,23 +279,36 @@ impl MessageSubscriptions {      pub fn subscribe_all(          &mut self, -    ) -> Receiver<(JID, MacawMessage)> { +    ) -> (Uuid, Receiver<(JID, MacawMessage)>) {          let (send, recv) = mpsc::channel(10); -        self.all.push(send); -        recv +        let id = Uuid::new_v4(); +        self.all.insert(id, send); +        (id, recv)      }      pub fn subscribe_chat(          &mut self,          chat: JID, -    ) -> Receiver<MacawMessage> { +    ) -> (Uuid, Receiver<MacawMessage>) {          let (send, recv) = mpsc::channel(10); +        let id = Uuid::new_v4();          if let Some(chat_subscribers) = self.subset.get_mut(&chat) { -            chat_subscribers.push(send); +            chat_subscribers.insert(id, send);          } else { -            self.subset.insert(chat, vec![send]); +            let hash_map = HashMap::from([(id, send)]); +            self.subset.insert(chat, hash_map); +        } +        (id, recv) +    } + +    pub fn unsubscribe_all(&mut self, sub_id: Uuid) { +        self.all.remove(&sub_id); +    } + +    pub fn unsubscribe_chat(&mut self, sub_id: Uuid, chat: JID) { +        if let Some(chat_subs) = self.subset.get_mut(&chat) { +            chat_subs.remove(&sub_id);          } -        recv      }  } @@ -325,16 +342,16 @@ pub struct OpenChatsPanel {  pub fn open_chat(open_chats: Store<OpenChatsPanel>, chat: MacawChat) {      if let Some(jid) = &*open_chats.chat_view().read() {          if let Some((index, _jid, entry)) = open_chats.chats().write().shift_remove_full(jid) { -            let new_jid = chat.chat.correspondent().read().clone(); +            let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();              open_chats.chats().write().insert_before(index, new_jid.clone(), chat);              *open_chats.chat_view().write() = Some(new_jid);          } else { -            let new_jid = chat.chat.correspondent().read().clone(); +            let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();              open_chats.chats().write().insert(new_jid.clone(), chat);              *open_chats.chat_view().write() = Some(new_jid);          }      } else { -        let new_jid = chat.chat.correspondent().read().clone(); +        let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();          open_chats.chats().write().insert(new_jid.clone(), chat);          *open_chats.chat_view().write() = Some(new_jid);      } @@ -345,16 +362,16 @@ impl OpenChatsPanel {          if let Some(jid) = &mut self.chat_view {              debug!("a chat was already open");              if let Some((index, _jid, entry)) = self.chats.shift_remove_full(jid) { -                let new_jid = chat.chat.correspondent().read().clone(); +                let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();                  self.chats.insert_before(index, new_jid.clone(), chat);                  *&mut self.chat_view = Some(new_jid);              } else { -                let new_jid = chat.chat.correspondent().read().clone(); +                let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();                  self.chats.insert(new_jid.clone(), chat);                  *&mut self.chat_view = Some(new_jid);              }          } else { -            let new_jid = chat.chat.correspondent().read().clone(); +            let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();              self.chats.insert(new_jid.clone(), chat);              *&mut self.chat_view = Some(new_jid);          } @@ -386,11 +403,11 @@ fn Macaw(      let message_subscriptions = RwSignal::new(MessageSubscriptions::new());      provide_context(message_subscriptions); -    let messages_store: StateStore<Uuid, Store<Message>> = StateStore::new(); +    let messages_store: StateStore<Uuid, ArcStore<Message>> = StateStore::new();      provide_context(messages_store); -    let chats_store: StateStore<JID, Store<Chat>> = StateStore::new(); +    let chats_store: StateStore<JID, ArcStore<Chat>> = StateStore::new();      provide_context(chats_store); -    let users_store: StateStore<JID, Store<User>> = StateStore::new(); +    let users_store: StateStore<JID, ArcStore<User>> = StateStore::new();      provide_context(users_store);      let open_chats = Store::new(OpenChatsPanel::default()); @@ -426,14 +443,14 @@ fn Macaw(                  },                  UpdateMessage::Presence { from, presence } => {},                  UpdateMessage::Message { to, from, message } => { -                    // debug!("before got message"); +                    debug!("before got message");                      let new_message = MacawMessage::got_message_and_user(message, from); -                    // debug!("after got message"); +                    debug!("after got message");                      spawn_local(async move { message_subscriptions.write_untracked().broadcast(to, new_message).await }); -                    // debug!("after set message"); +                    debug!("after set message");                  },                  UpdateMessage::MessageDelivery { id, chat, delivery } => { -                    messages_store.modify(&id, |message| message.delivery().set(Some(delivery))); +                    messages_store.modify(&id, |message| <ArcStore<filamento::chat::Message> as Clone>::clone(&message).delivery().set(Some(delivery)));                  },                  UpdateMessage::SubscriptionRequest(jid) => {},                  UpdateMessage::NickChanged { jid, nick } => { @@ -493,7 +510,7 @@ pub fn OpenChatsPanelView() -> impl IntoView {  #[component]  pub fn OpenChatView(chat: MacawChat) -> impl IntoView { -    let chat_chat = *chat.chat; +    let chat_chat: Store<Chat> = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).into();      let chat_jid = move || chat_chat.correspondent().get();      view! { @@ -507,7 +524,7 @@ pub fn OpenChatView(chat: MacawChat) -> impl IntoView {  #[component]  pub fn ChatViewHeader(chat: MacawChat) -> impl IntoView { -    let chat_user = *chat.user; +    let chat_user = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into();      let avatar = LocalResource::new(move || get_avatar(chat_user));      let name = move || get_name(chat_user);      let jid = move || chat_user.jid().read().to_string(); @@ -528,8 +545,8 @@ pub fn ChatViewHeader(chat: MacawChat) -> impl IntoView {  #[component]  pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {      let (messages, set_messages) = signal(IndexMap::new()); -    let chat_chat = *chat.chat; -    let chat_user = *chat.user; +    let chat_chat: Store<Chat> = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).into(); +    let chat_user: Store<User> = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into();      let load_messages = LocalResource::new(move || async move {          let client = use_context::<Client>().expect("client not in context"); @@ -538,7 +555,7 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {              Ok(m) => {                  let messages = m.into_iter().map(|(message, message_user)| {                      (message.id, MacawMessage::got_message_and_user(message, message_user)) -                }).rev().collect::<IndexMap<Uuid, _>>(); +                }).collect::<IndexMap<Uuid, _>>();                  set_messages.set(messages);              },              Err(_) => { @@ -549,20 +566,22 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {      // TODO: filter new messages signal      let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap(); +    let (sub_id, set_sub_id) = signal(None);      let _load_new_messages = LocalResource::new(move || async move {          load_messages.await; -        let mut new_messages = new_messages_signal.write().subscribe_chat(chat_chat.correspondent().get_untracked()); +        let (sub_id, mut new_messages) = new_messages_signal.write().subscribe_chat(chat_chat.correspondent().get_untracked()); +        set_sub_id.set(Some(sub_id));          while let Some(new_message) = new_messages.recv().await {              debug!("got new message in let message buffer");              let mut messages= set_messages.write();              if let Some((_, last)) = messages.last() { -                if *last.message.timestamp().read_untracked() < *new_message.timestamp().read_untracked() { +                if *<ArcStore<filamento::chat::Message> as Clone>::clone(&last.message).timestamp().read_untracked() < *<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message).timestamp().read_untracked() {                      messages -                        .insert(new_message.message.id().get_untracked(), new_message); +                        .insert(<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(), new_message);                      debug!("set the new message in message buffer");                  } else {                      let index = match messages.binary_search_by(|_, value| { -                        value.message.timestamp().read_untracked().cmp(&new_message.message.timestamp().read_untracked()) +                        <ArcStore<filamento::chat::Message> as Clone>::clone(&value.message).timestamp().read_untracked().cmp(&<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).timestamp().read_untracked())                      }) {                          Ok(i) => i,                          Err(i) => i, @@ -570,24 +589,29 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {                      messages.insert_before(                          // TODO: check if this logic is correct                          index, -                        new_message.message.id().get_untracked(), +                        <ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(),                          new_message,                      );                      debug!("set the new message in message buffer");                  }              } else {                  messages -                    .insert(new_message.message.id().get_untracked(), new_message); +                    .insert(<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(), new_message);                  debug!("set the new message in message buffer");              }          }      }); +    on_cleanup(move || { +        if let Some(sub_id) = sub_id.get() { +            new_messages_signal.write().unsubscribe_chat(sub_id, chat_chat.correspondent().get()); +        } +    });      let each = move || {          let mut last_timestamp = NaiveDateTime::MIN;          let mut last_user: Option<JID> = None;          let mut messages = messages.get().into_iter().map(|(id, message)| { -            let message_timestamp = message.message.timestamp().read().naive_local(); +            let message_timestamp = <ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).timestamp().read().naive_local();              // if message_timestamp.date() > last_timestamp.date() {              //     messages_view = messages_view.push(date(message_timestamp.date()));              // } @@ -598,7 +622,7 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {              } else {                  false              }; -            last_user = Some(message.message.from().get()); +            last_user = Some(<ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).from().get());              last_timestamp = message_timestamp;              (id, (message, major, false))          }).collect::<Vec<_>>(); @@ -716,8 +740,8 @@ pub fn Delivery(delivery: Delivery) -> impl IntoView {  #[component]  pub fn Message(message: MacawMessage, major: bool, r#final: bool) -> impl IntoView { -    let message_message = *message.message; -    let message_user = *message.user; +    let message_message: Store<Message> = <ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).into(); +    let message_user = <ArcStore<filamento::user::User> as Clone>::clone(&message.user).into();      let avatar = LocalResource::new(move || get_avatar(message_user));      let name = move || get_name(message_user); @@ -954,15 +978,15 @@ impl<K, V> StateStore<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'stati      }      fn remove(&self, key: &K) { -        let store = self.inner.try_get_value().unwrap(); -        let mut store = store.store.write().unwrap(); -        if let Some((_v, count)) = store.get_mut(key) { -            *count -= 1; -            if *count == 0 { -                store.remove(key); -                debug!("dropped item from store"); -            } -        } +        // let store = self.inner.try_get_value().unwrap(); +        // let mut store = store.store.write().unwrap(); +        // if let Some((_v, count)) = store.get_mut(key) { +        //     *count -= 1; +        //     if *count == 0 { +        //         store.remove(key); +        //         debug!("dropped item from store"); +        //     } +        // }      }  } @@ -1020,16 +1044,16 @@ impl<K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static>  #[derive(Clone)]  struct MacawChat { -    chat: StateListener<JID, Store<Chat>>, -    user: StateListener<JID, Store<User>>, +    chat: StateListener<JID, ArcStore<Chat>>, +    user: StateListener<JID, ArcStore<User>>,  }  impl MacawChat {      fn got_chat_and_user(chat: Chat, user: User) -> Self { -        let chat_state_store: StateStore<JID, Store<Chat>> = use_context().expect("no chat state store"); -        let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); -        let user = user_state_store.store(user.jid.clone(), Store::new(user)); -        let chat = chat_state_store.store(chat.correspondent.clone(), Store::new(chat)); +        let chat_state_store: StateStore<JID, ArcStore<Chat>> = use_context().expect("no chat state store"); +        let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store"); +        let user = user_state_store.store(user.jid.clone(), ArcStore::new(user)); +        let chat = chat_state_store.store(chat.correspondent.clone(), ArcStore::new(chat));          Self {              chat,              user, @@ -1038,7 +1062,7 @@ impl MacawChat {  }  impl Deref for MacawChat { -    type Target = StateListener<JID, Store<Chat>>; +    type Target = StateListener<JID, ArcStore<Chat>>;      fn deref(&self) -> &Self::Target {          &self.chat @@ -1053,16 +1077,16 @@ impl DerefMut for MacawChat {  #[derive(Clone)]  struct MacawMessage { -    message: StateListener<Uuid, Store<Message>>, -    user: StateListener<JID, Store<User>>, +    message: StateListener<Uuid, ArcStore<Message>>, +    user: StateListener<JID, ArcStore<User>>,  }  impl MacawMessage {      fn got_message_and_user(message: Message, user: User) -> Self { -        let message_state_store: StateStore<Uuid, Store<Message>> = use_context().expect("no message state store"); -        let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); -        let message = message_state_store.store(message.id, Store::new(message)); -        let user = user_state_store.store(user.jid.clone(), Store::new(user)); +        let message_state_store: StateStore<Uuid, ArcStore<Message>> = use_context().expect("no message state store"); +        let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store"); +        let message = message_state_store.store(message.id, ArcStore::new(message)); +        let user = user_state_store.store(user.jid.clone(), ArcStore::new(user));          Self {              message,              user, @@ -1071,7 +1095,7 @@ impl MacawMessage {  }  impl Deref for MacawMessage { -    type Target = StateListener<Uuid, Store<Message>>; +    type Target = StateListener<Uuid, ArcStore<Message>>;      fn deref(&self) -> &Self::Target {          &self.message @@ -1104,14 +1128,14 @@ impl DerefMut for MacawUser {  struct MacawContact {      contact: Store<Contact>, -    user: StateListener<JID, Store<User>>, +    user: StateListener<JID, ArcStore<User>>,  }  impl MacawContact {      fn got_contact_and_user(contact: Contact, user: User) -> Self {          let contact = Store::new(contact); -        let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store"); -        let user = user_state_store.store(user.jid.clone(), Store::new(user)); +        let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store"); +        let user = user_state_store.store(user.jid.clone(), ArcStore::new(user));          Self {              contact,              user, @@ -1155,9 +1179,11 @@ fn ChatsList() -> impl IntoView {      // TODO: filter new messages signal      let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap(); +    let (sub_id, set_sub_id) = signal(None);      let _load_new_messages = LocalResource::new(move || async move {          load_chats.await; -        let mut new_messages = new_messages_signal.write().subscribe_all(); +        let (sub_id, mut new_messages) = new_messages_signal.write().subscribe_all(); +        set_sub_id.set(Some(sub_id));          while let Some((to, new_message)) = new_messages.recv().await {              debug!("got new message in let");              let mut chats = set_chats.write(); @@ -1181,6 +1207,11 @@ fn ChatsList() -> impl IntoView {          }          debug!("set the new message");      }); +    on_cleanup(move || { +        if let Some(sub_id) = sub_id.get() { +            new_messages_signal.write().unsubscribe_all(sub_id); +        } +    });      view! {          <div class="chats-list panel"> @@ -1229,7 +1260,7 @@ pub fn get_name(user: Store<User>) -> String {  #[component]  fn ChatsListItem(chat: MacawChat, message: MacawMessage) -> impl IntoView { -    let chat_user = *chat.user; +    let chat_user: Store<User> = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into();      let avatar = LocalResource::new(move || get_avatar(chat_user));      let name = move || get_name(chat_user);  | 
