use chrono::{NaiveDateTime, TimeDelta}; use filamento::{chat::{Chat, ChatStoreFields, MessageStoreFields}, user::User}; use indexmap::IndexMap; use jid::BareJID; use leptos::prelude::*; use reactive_stores::{ArcStore, Store}; use tracing::{debug, error}; use uuid::Uuid; use crate::{chat::MacawChat, client::Client, components::message::Message, message::{ArcMacawMessage, MacawMessage}, message_subscriptions::MessageSubscriptions}; #[component] pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView { let (messages, set_messages) = arc_signal(IndexMap::new()); let load_set_messages = set_messages.clone(); let load_messages = LocalResource::new(move || { let load_set_messages = load_set_messages.clone(); async move { let client = use_context::().expect("client not in context"); let messages = client .get_messages_with_users(chat.get().correspondent().get()) .await .map_err(|e| e.to_string()); match messages { Ok(m) => { let messages = m .into_iter() .map(|(message, message_user)| { ( message.id, ArcMacawMessage::got_message_and_user(message, message_user), ) }) .collect::>(); load_set_messages.set(messages); } Err(err) => { error!("{err}") // TODO: show error message at top of chats list } } } }); // TODO: filter new messages signal let new_messages_signal: RwSignal = use_context().unwrap(); let (sub_id, set_sub_id) = signal(None); let load_new_messages_set = set_messages.clone(); let _load_new_messages = LocalResource::new(move || { let load_new_messages_set = load_new_messages_set.clone(); async move { load_messages.await; let (sub_id, mut new_messages) = new_messages_signal .write() .subscribe_chat(chat.get().correspondent().get()); set_sub_id.set(Some(sub_id)); while let Some(new_message) = new_messages.recv().await { debug!("got new message in let message buffer"); let mut messages = load_new_messages_set.write(); if let Some((_, last)) = messages.last() { if *last.get() .timestamp() .read() < *new_message.get() .timestamp() .read() { messages.insert( new_message.get() .id() .get(), new_message, ); debug!("set the new message in message buffer"); } else { let index = match messages.binary_search_by(|_, value| { value.get() .timestamp() .read() .cmp( &new_message.get() .timestamp() .read(), ) }) { Ok(i) => i, Err(i) => i, }; messages.insert_before( // TODO: check if this logic is correct index, new_message.get() .id() .get(), new_message, ); debug!("set the new message in message buffer"); } } else { messages.insert( new_message.get() .id() .get(), new_message, ); debug!("set the new message in message buffer"); } } } }); on_cleanup(move || { if let Some(sub_id) = sub_id.get_untracked() { new_messages_signal .write() .unsubscribe_chat(sub_id, chat.get().correspondent().get_untracked()); } }); let each = move || { let mut last_timestamp = NaiveDateTime::MIN; let mut last_user: Option = None; let mut messages = messages .get() .into_iter() .map(|(id, message)| { let message_timestamp = message.message.get() .timestamp() .read() .naive_local(); // TODO: mark new day // if message_timestamp.date() > last_timestamp.date() { // messages_view = messages_view.push(date(message_timestamp.date())); // } let major = if last_user.as_ref() != Some(&message.message.get().read().from) || message_timestamp - last_timestamp > TimeDelta::minutes(3) { true } else { false }; last_user = Some( message.get() .from() .get(), ); last_timestamp = message_timestamp; (id, (message, major, false)) }) .collect::>(); if let Some((_id, (_, _, last))) = messages.last_mut() { *last = true } messages.into_iter().rev() }; view! {
} }