summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs181
1 files changed, 106 insertions, 75 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 0be757e..f16e4da 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -24,7 +24,7 @@ use leptos::{
};
use leptos_meta::Stylesheet;
use leptos_use::{use_textarea_autosize, UseTextareaAutosizeReturn};
-use reactive_stores::{Store, StoreField};
+use reactive_stores::{ArcStore, Store, StoreField};
use stylance::import_style;
use thiserror::Error;
use tokio::sync::{mpsc::{self, Receiver}, Mutex};
@@ -224,14 +224,14 @@ fn LoginPage(set_app: WriteSignal<AppState>, set_client: RwSignal<Option<(Client
}
pub struct MessageSubscriptions {
- all: Vec<mpsc::Sender<(JID, MacawMessage)>>,
- subset: HashMap<JID, Vec<mpsc::Sender<MacawMessage>>>,
+ all: HashMap<Uuid, mpsc::Sender<(JID, MacawMessage)>>,
+ subset: HashMap<JID, HashMap<Uuid, mpsc::Sender<MacawMessage>>>,
}
impl MessageSubscriptions {
pub fn new() -> Self {
Self {
- all: Vec::new(),
+ all: HashMap::new(),
subset: HashMap::new(),
}
}
@@ -242,31 +242,35 @@ impl MessageSubscriptions {
message: MacawMessage,
) {
// subscriptions to all
- let mut index = 0;
- while let Some(sender) = self.all.get(index) {
+ let mut removals = Vec::new();
+ for (id, sender) in &self.all {
match sender.send((to.clone(), message.clone())).await {
Ok(_) => {
- index += 1;
}
Err(_) => {
- self.all.swap_remove(index);
+ removals.push(*id);
}
}
}
+ for removal in removals {
+ self.all.remove(&removal);
+ }
// subscriptions to specific chat
if let Some(subscribers) = self.subset.get_mut(&to) {
- let mut index = 0;
- while let Some(sender) = subscribers.get(index) {
+ let mut removals = Vec::new();
+ for (id, sender) in &*subscribers {
match sender.send(message.clone()).await {
Ok(_) => {
- index += 1;
}
Err(_) => {
- subscribers.swap_remove(index);
+ removals.push(*id);
}
}
}
+ for removal in removals {
+ subscribers.remove(&removal);
+ }
if subscribers.is_empty() {
self.subset.remove(&to);
}
@@ -275,23 +279,36 @@ impl MessageSubscriptions {
pub fn subscribe_all(
&mut self,
- ) -> Receiver<(JID, MacawMessage)> {
+ ) -> (Uuid, Receiver<(JID, MacawMessage)>) {
let (send, recv) = mpsc::channel(10);
- self.all.push(send);
- recv
+ let id = Uuid::new_v4();
+ self.all.insert(id, send);
+ (id, recv)
}
pub fn subscribe_chat(
&mut self,
chat: JID,
- ) -> Receiver<MacawMessage> {
+ ) -> (Uuid, Receiver<MacawMessage>) {
let (send, recv) = mpsc::channel(10);
+ let id = Uuid::new_v4();
if let Some(chat_subscribers) = self.subset.get_mut(&chat) {
- chat_subscribers.push(send);
+ chat_subscribers.insert(id, send);
} else {
- self.subset.insert(chat, vec![send]);
+ let hash_map = HashMap::from([(id, send)]);
+ self.subset.insert(chat, hash_map);
+ }
+ (id, recv)
+ }
+
+ pub fn unsubscribe_all(&mut self, sub_id: Uuid) {
+ self.all.remove(&sub_id);
+ }
+
+ pub fn unsubscribe_chat(&mut self, sub_id: Uuid, chat: JID) {
+ if let Some(chat_subs) = self.subset.get_mut(&chat) {
+ chat_subs.remove(&sub_id);
}
- recv
}
}
@@ -325,16 +342,16 @@ pub struct OpenChatsPanel {
pub fn open_chat(open_chats: Store<OpenChatsPanel>, chat: MacawChat) {
if let Some(jid) = &*open_chats.chat_view().read() {
if let Some((index, _jid, entry)) = open_chats.chats().write().shift_remove_full(jid) {
- let new_jid = chat.chat.correspondent().read().clone();
+ let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();
open_chats.chats().write().insert_before(index, new_jid.clone(), chat);
*open_chats.chat_view().write() = Some(new_jid);
} else {
- let new_jid = chat.chat.correspondent().read().clone();
+ let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();
open_chats.chats().write().insert(new_jid.clone(), chat);
*open_chats.chat_view().write() = Some(new_jid);
}
} else {
- let new_jid = chat.chat.correspondent().read().clone();
+ let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();
open_chats.chats().write().insert(new_jid.clone(), chat);
*open_chats.chat_view().write() = Some(new_jid);
}
@@ -345,16 +362,16 @@ impl OpenChatsPanel {
if let Some(jid) = &mut self.chat_view {
debug!("a chat was already open");
if let Some((index, _jid, entry)) = self.chats.shift_remove_full(jid) {
- let new_jid = chat.chat.correspondent().read().clone();
+ let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();
self.chats.insert_before(index, new_jid.clone(), chat);
*&mut self.chat_view = Some(new_jid);
} else {
- let new_jid = chat.chat.correspondent().read().clone();
+ let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();
self.chats.insert(new_jid.clone(), chat);
*&mut self.chat_view = Some(new_jid);
}
} else {
- let new_jid = chat.chat.correspondent().read().clone();
+ let new_jid = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).correspondent().read().clone();
self.chats.insert(new_jid.clone(), chat);
*&mut self.chat_view = Some(new_jid);
}
@@ -386,11 +403,11 @@ fn Macaw(
let message_subscriptions = RwSignal::new(MessageSubscriptions::new());
provide_context(message_subscriptions);
- let messages_store: StateStore<Uuid, Store<Message>> = StateStore::new();
+ let messages_store: StateStore<Uuid, ArcStore<Message>> = StateStore::new();
provide_context(messages_store);
- let chats_store: StateStore<JID, Store<Chat>> = StateStore::new();
+ let chats_store: StateStore<JID, ArcStore<Chat>> = StateStore::new();
provide_context(chats_store);
- let users_store: StateStore<JID, Store<User>> = StateStore::new();
+ let users_store: StateStore<JID, ArcStore<User>> = StateStore::new();
provide_context(users_store);
let open_chats = Store::new(OpenChatsPanel::default());
@@ -426,14 +443,14 @@ fn Macaw(
},
UpdateMessage::Presence { from, presence } => {},
UpdateMessage::Message { to, from, message } => {
- // debug!("before got message");
+ debug!("before got message");
let new_message = MacawMessage::got_message_and_user(message, from);
- // debug!("after got message");
+ debug!("after got message");
spawn_local(async move { message_subscriptions.write_untracked().broadcast(to, new_message).await });
- // debug!("after set message");
+ debug!("after set message");
},
UpdateMessage::MessageDelivery { id, chat, delivery } => {
- messages_store.modify(&id, |message| message.delivery().set(Some(delivery)));
+ messages_store.modify(&id, |message| <ArcStore<filamento::chat::Message> as Clone>::clone(&message).delivery().set(Some(delivery)));
},
UpdateMessage::SubscriptionRequest(jid) => {},
UpdateMessage::NickChanged { jid, nick } => {
@@ -493,7 +510,7 @@ pub fn OpenChatsPanelView() -> impl IntoView {
#[component]
pub fn OpenChatView(chat: MacawChat) -> impl IntoView {
- let chat_chat = *chat.chat;
+ let chat_chat: Store<Chat> = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).into();
let chat_jid = move || chat_chat.correspondent().get();
view! {
@@ -507,7 +524,7 @@ pub fn OpenChatView(chat: MacawChat) -> impl IntoView {
#[component]
pub fn ChatViewHeader(chat: MacawChat) -> impl IntoView {
- let chat_user = *chat.user;
+ let chat_user = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into();
let avatar = LocalResource::new(move || get_avatar(chat_user));
let name = move || get_name(chat_user);
let jid = move || chat_user.jid().read().to_string();
@@ -528,8 +545,8 @@ pub fn ChatViewHeader(chat: MacawChat) -> impl IntoView {
#[component]
pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {
let (messages, set_messages) = signal(IndexMap::new());
- let chat_chat = *chat.chat;
- let chat_user = *chat.user;
+ let chat_chat: Store<Chat> = <ArcStore<filamento::chat::Chat> as Clone>::clone(&chat.chat).into();
+ let chat_user: Store<User> = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into();
let load_messages = LocalResource::new(move || async move {
let client = use_context::<Client>().expect("client not in context");
@@ -538,7 +555,7 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {
Ok(m) => {
let messages = m.into_iter().map(|(message, message_user)| {
(message.id, MacawMessage::got_message_and_user(message, message_user))
- }).rev().collect::<IndexMap<Uuid, _>>();
+ }).collect::<IndexMap<Uuid, _>>();
set_messages.set(messages);
},
Err(_) => {
@@ -549,20 +566,22 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {
// TODO: filter new messages signal
let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap();
+ let (sub_id, set_sub_id) = signal(None);
let _load_new_messages = LocalResource::new(move || async move {
load_messages.await;
- let mut new_messages = new_messages_signal.write().subscribe_chat(chat_chat.correspondent().get_untracked());
+ let (sub_id, mut new_messages) = new_messages_signal.write().subscribe_chat(chat_chat.correspondent().get_untracked());
+ set_sub_id.set(Some(sub_id));
while let Some(new_message) = new_messages.recv().await {
debug!("got new message in let message buffer");
let mut messages= set_messages.write();
if let Some((_, last)) = messages.last() {
- if *last.message.timestamp().read_untracked() < *new_message.timestamp().read_untracked() {
+ if *<ArcStore<filamento::chat::Message> as Clone>::clone(&last.message).timestamp().read_untracked() < *<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message).timestamp().read_untracked() {
messages
- .insert(new_message.message.id().get_untracked(), new_message);
+ .insert(<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(), new_message);
debug!("set the new message in message buffer");
} else {
let index = match messages.binary_search_by(|_, value| {
- value.message.timestamp().read_untracked().cmp(&new_message.message.timestamp().read_untracked())
+ <ArcStore<filamento::chat::Message> as Clone>::clone(&value.message).timestamp().read_untracked().cmp(&<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).timestamp().read_untracked())
}) {
Ok(i) => i,
Err(i) => i,
@@ -570,24 +589,29 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {
messages.insert_before(
// TODO: check if this logic is correct
index,
- new_message.message.id().get_untracked(),
+ <ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(),
new_message,
);
debug!("set the new message in message buffer");
}
} else {
messages
- .insert(new_message.message.id().get_untracked(), new_message);
+ .insert(<ArcStore<filamento::chat::Message> as Clone>::clone(&new_message.message).id().get_untracked(), new_message);
debug!("set the new message in message buffer");
}
}
});
+ on_cleanup(move || {
+ if let Some(sub_id) = sub_id.get() {
+ new_messages_signal.write().unsubscribe_chat(sub_id, chat_chat.correspondent().get());
+ }
+ });
let each = move || {
let mut last_timestamp = NaiveDateTime::MIN;
let mut last_user: Option<JID> = None;
let mut messages = messages.get().into_iter().map(|(id, message)| {
- let message_timestamp = message.message.timestamp().read().naive_local();
+ let message_timestamp = <ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).timestamp().read().naive_local();
// if message_timestamp.date() > last_timestamp.date() {
// messages_view = messages_view.push(date(message_timestamp.date()));
// }
@@ -598,7 +622,7 @@ pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView {
} else {
false
};
- last_user = Some(message.message.from().get());
+ last_user = Some(<ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).from().get());
last_timestamp = message_timestamp;
(id, (message, major, false))
}).collect::<Vec<_>>();
@@ -716,8 +740,8 @@ pub fn Delivery(delivery: Delivery) -> impl IntoView {
#[component]
pub fn Message(message: MacawMessage, major: bool, r#final: bool) -> impl IntoView {
- let message_message = *message.message;
- let message_user = *message.user;
+ let message_message: Store<Message> = <ArcStore<filamento::chat::Message> as Clone>::clone(&message.message).into();
+ let message_user = <ArcStore<filamento::user::User> as Clone>::clone(&message.user).into();
let avatar = LocalResource::new(move || get_avatar(message_user));
let name = move || get_name(message_user);
@@ -954,15 +978,15 @@ impl<K, V> StateStore<K, V> where K: Eq + std::hash::Hash + Send + Sync + 'stati
}
fn remove(&self, key: &K) {
- let store = self.inner.try_get_value().unwrap();
- let mut store = store.store.write().unwrap();
- if let Some((_v, count)) = store.get_mut(key) {
- *count -= 1;
- if *count == 0 {
- store.remove(key);
- debug!("dropped item from store");
- }
- }
+ // let store = self.inner.try_get_value().unwrap();
+ // let mut store = store.store.write().unwrap();
+ // if let Some((_v, count)) = store.get_mut(key) {
+ // *count -= 1;
+ // if *count == 0 {
+ // store.remove(key);
+ // debug!("dropped item from store");
+ // }
+ // }
}
}
@@ -1020,16 +1044,16 @@ impl<K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static>
#[derive(Clone)]
struct MacawChat {
- chat: StateListener<JID, Store<Chat>>,
- user: StateListener<JID, Store<User>>,
+ chat: StateListener<JID, ArcStore<Chat>>,
+ user: StateListener<JID, ArcStore<User>>,
}
impl MacawChat {
fn got_chat_and_user(chat: Chat, user: User) -> Self {
- let chat_state_store: StateStore<JID, Store<Chat>> = use_context().expect("no chat state store");
- let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store");
- let user = user_state_store.store(user.jid.clone(), Store::new(user));
- let chat = chat_state_store.store(chat.correspondent.clone(), Store::new(chat));
+ let chat_state_store: StateStore<JID, ArcStore<Chat>> = use_context().expect("no chat state store");
+ let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store");
+ let user = user_state_store.store(user.jid.clone(), ArcStore::new(user));
+ let chat = chat_state_store.store(chat.correspondent.clone(), ArcStore::new(chat));
Self {
chat,
user,
@@ -1038,7 +1062,7 @@ impl MacawChat {
}
impl Deref for MacawChat {
- type Target = StateListener<JID, Store<Chat>>;
+ type Target = StateListener<JID, ArcStore<Chat>>;
fn deref(&self) -> &Self::Target {
&self.chat
@@ -1053,16 +1077,16 @@ impl DerefMut for MacawChat {
#[derive(Clone)]
struct MacawMessage {
- message: StateListener<Uuid, Store<Message>>,
- user: StateListener<JID, Store<User>>,
+ message: StateListener<Uuid, ArcStore<Message>>,
+ user: StateListener<JID, ArcStore<User>>,
}
impl MacawMessage {
fn got_message_and_user(message: Message, user: User) -> Self {
- let message_state_store: StateStore<Uuid, Store<Message>> = use_context().expect("no message state store");
- let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store");
- let message = message_state_store.store(message.id, Store::new(message));
- let user = user_state_store.store(user.jid.clone(), Store::new(user));
+ let message_state_store: StateStore<Uuid, ArcStore<Message>> = use_context().expect("no message state store");
+ let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store");
+ let message = message_state_store.store(message.id, ArcStore::new(message));
+ let user = user_state_store.store(user.jid.clone(), ArcStore::new(user));
Self {
message,
user,
@@ -1071,7 +1095,7 @@ impl MacawMessage {
}
impl Deref for MacawMessage {
- type Target = StateListener<Uuid, Store<Message>>;
+ type Target = StateListener<Uuid, ArcStore<Message>>;
fn deref(&self) -> &Self::Target {
&self.message
@@ -1104,14 +1128,14 @@ impl DerefMut for MacawUser {
struct MacawContact {
contact: Store<Contact>,
- user: StateListener<JID, Store<User>>,
+ user: StateListener<JID, ArcStore<User>>,
}
impl MacawContact {
fn got_contact_and_user(contact: Contact, user: User) -> Self {
let contact = Store::new(contact);
- let user_state_store: StateStore<JID, Store<User>> = use_context().expect("no user state store");
- let user = user_state_store.store(user.jid.clone(), Store::new(user));
+ let user_state_store: StateStore<JID, ArcStore<User>> = use_context().expect("no user state store");
+ let user = user_state_store.store(user.jid.clone(), ArcStore::new(user));
Self {
contact,
user,
@@ -1155,9 +1179,11 @@ fn ChatsList() -> impl IntoView {
// TODO: filter new messages signal
let new_messages_signal: RwSignal<MessageSubscriptions> = use_context().unwrap();
+ let (sub_id, set_sub_id) = signal(None);
let _load_new_messages = LocalResource::new(move || async move {
load_chats.await;
- let mut new_messages = new_messages_signal.write().subscribe_all();
+ let (sub_id, mut new_messages) = new_messages_signal.write().subscribe_all();
+ set_sub_id.set(Some(sub_id));
while let Some((to, new_message)) = new_messages.recv().await {
debug!("got new message in let");
let mut chats = set_chats.write();
@@ -1181,6 +1207,11 @@ fn ChatsList() -> impl IntoView {
}
debug!("set the new message");
});
+ on_cleanup(move || {
+ if let Some(sub_id) = sub_id.get() {
+ new_messages_signal.write().unsubscribe_all(sub_id);
+ }
+ });
view! {
<div class="chats-list panel">
@@ -1229,7 +1260,7 @@ pub fn get_name(user: Store<User>) -> String {
#[component]
fn ChatsListItem(chat: MacawChat, message: MacawMessage) -> impl IntoView {
- let chat_user = *chat.user;
+ let chat_user: Store<User> = <ArcStore<filamento::user::User> as Clone>::clone(&chat.user).into();
let avatar = LocalResource::new(move || get_avatar(chat_user));
let name = move || get_name(chat_user);