use std::{ borrow::Borrow, cell::RefCell, collections::HashMap, marker::PhantomData, ops::{Deref, DerefMut}, rc::Rc, str::FromStr, sync::{atomic::AtomicUsize, Arc, RwLock}, thread::sleep, time::{self, Duration}, }; use filamento::{ chat::{Body, Chat, ChatStoreFields, Message, MessageStoreFields}, db::Db, error::{CommandError, ConnectionError, DatabaseError}, files::FilesMem, roster::{Contact, ContactStoreFields}, user::{User, UserStoreFields}, UpdateMessage }; use futures::stream::StreamExt; use indexmap::IndexMap; use jid::JID; use leptos::{ html::{self, Div, Input, Textarea}, prelude::*, tachys::dom::document, task::{spawn, spawn_local} }; use leptos_meta::Stylesheet; use leptos_use::{use_textarea_autosize, UseTextareaAutosizeReturn}; use reactive_stores::{Store, StoreField}; use stylance::import_style; use thiserror::Error; use tokio::sync::{mpsc::{self, Receiver}, Mutex}; use tracing::debug; use uuid::Uuid; const NO_AVATAR: &str = "/assets/no-avatar.png"; pub enum AppState { LoggedOut, LoggedIn, } #[derive(Clone)] pub struct Client { client: filamento::Client, jid: Arc, file_store: FilesMem, } impl Deref for Client { type Target = filamento::Client; fn deref(&self) -> &Self::Target { &self.client } } impl DerefMut for Client { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.client } } #[component] pub fn App() -> impl IntoView { let (app, set_app) = signal(AppState::LoggedOut); let client: RwSignal)>> = RwSignal::new(None); view! { {move || match &*app.read() { AppState::LoggedOut => view! { }.into_any(), AppState::LoggedIn => { if let Some((client, updates)) = client.write_untracked().take() { view! { }.into_any() } else { set_app.set(AppState::LoggedOut); view! { }.into_any() } } }} } } #[derive(Clone, Debug, Error)] pub enum LoginError { #[error("Missing Password")] MissingPassword, #[error("Missing JID")] MissingJID, #[error("Invalid JID: {0}")] InvalidJID(#[from] jid::ParseError), #[error("Connection Error: {0}")] ConnectionError(#[from] CommandError), } #[component] fn LoginPage(set_app: WriteSignal, set_client: RwSignal)>>) -> impl IntoView { let jid = RwSignal::new("".to_string()); let password = RwSignal::new("".to_string()); let remember_me = RwSignal::new(false); let connect_on_login = RwSignal::new(true); let (error, set_error) = signal(None::); let error_message = move || { error.with(|error| { if let Some(error) = error { view! {
{error.to_string()}
}.into_any() } else { view! {}.into_any() } }) }; let (login_pending, set_login_pending) = signal(false); let login = Action::new_local(move |_| { async move { set_login_pending.set(true); if jid.read_untracked().is_empty() { set_error.set(Some(LoginError::MissingJID)); set_login_pending.set(false); return } if password.read_untracked().is_empty() { set_error.set(Some(LoginError::MissingPassword)); set_login_pending.set(false); return } let jid = match JID::from_str(&jid.read_untracked()) { Ok(j) => j, Err(e) => { set_error.set(Some(e.into())); set_login_pending.set(false); return }, }; // initialise the client let db = Db::create_connect_and_migrate("mem.db").await.unwrap(); let files_mem = FilesMem::new(); let (client, updates) = filamento::Client::new(jid.clone(), password.read_untracked().clone(), db, files_mem.clone()); // TODO: remember_me let client = Client { client, jid: Arc::new(jid), file_store: files_mem, }; if *connect_on_login.read_untracked() { match client.connect().await { Ok(_) => {}, Err(e) => { set_error.set(Some(e.into())); set_login_pending.set(false); return }, } } // debug!("before setting app state"); set_client.set(Some((client, updates))); set_app.set(AppState::LoggedIn); } }); view! {

Macaw Instant Messenger

{error_message}
} } pub struct MessageSubscriptions { all: Vec>, subset: HashMap>>, } impl MessageSubscriptions { pub fn new() -> Self { Self { all: Vec::new(), subset: HashMap::new(), } } pub async fn broadcast( &mut self, to: JID, message: MacawMessage, ) { // subscriptions to all let mut index = 0; while let Some(sender) = self.all.get(index) { match sender.send((to.clone(), message.clone())).await { Ok(_) => { index += 1; } Err(_) => { self.all.swap_remove(index); } } } // subscriptions to specific chat if let Some(subscribers) = self.subset.get_mut(&to) { let mut index = 0; while let Some(sender) = subscribers.get(index) { match sender.send(message.clone()).await { Ok(_) => { index += 1; } Err(_) => { subscribers.swap_remove(index); } } } if subscribers.is_empty() { self.subset.remove(&to); } } } pub fn subscribe_all( &mut self, ) -> Receiver<(JID, MacawMessage)> { let (send, recv) = mpsc::channel(10); self.all.push(send); recv } pub fn subscribe_chat( &mut self, chat: JID, ) -> Receiver { let (send, recv) = mpsc::channel(10); if let Some(chat_subscribers) = self.subset.get_mut(&chat) { chat_subscribers.push(send); } else { self.subset.insert(chat, vec![send]); } recv } } #[derive(Store)] pub struct Roster { #[store(key: JID = |(jid, _)| jid.clone())] contacts: HashMap, } impl Roster { pub fn new() -> Self { Self { contacts: HashMap::new(), } } } // TODO: multiple panels // pub struct OpenChats { // panels: // } #[derive(Store, Default)] pub struct OpenChatsPanel { // jid must be a chat in the chats map chat_view: Option, #[store(key: JID = |(jid, _)| jid.clone())] chats: IndexMap, } pub fn open_chat(open_chats: Store, chat: MacawChat) { if let Some(jid) = &*open_chats.chat_view().read() { if let Some((index, _jid, entry)) = open_chats.chats().write().shift_remove_full(jid) { let new_jid = chat.chat.correspondent().read().clone(); open_chats.chats().write().insert_before(index, new_jid.clone(), chat); *open_chats.chat_view().write() = Some(new_jid); } else { let new_jid = chat.chat.correspondent().read().clone(); open_chats.chats().write().insert(new_jid.clone(), chat); *open_chats.chat_view().write() = Some(new_jid); } } else { let new_jid = chat.chat.correspondent().read().clone(); open_chats.chats().write().insert(new_jid.clone(), chat); *open_chats.chat_view().write() = Some(new_jid); } } impl OpenChatsPanel { pub fn open(&mut self, chat: MacawChat) { if let Some(jid) = &mut self.chat_view { if let Some((index, _jid, entry)) = self.chats.shift_remove_full(jid) { let new_jid = chat.chat.correspondent().read().clone(); self.chats.insert_before(index, new_jid.clone(), chat); *&mut self.chat_view = Some(new_jid); } else { let new_jid = chat.chat.correspondent().read().clone(); self.chats.insert(new_jid.clone(), chat); *&mut self.chat_view = Some(new_jid); } } else { let new_jid = chat.chat.correspondent().read().clone(); self.chats.insert(new_jid.clone(), chat); *&mut self.chat_view = Some(new_jid); } } // TODO: // pub fn open_in_new_tab_unfocused(&mut self) { // } // pub fn open_in_new_tab_focus(&mut self) { // } } #[component] fn Macaw( // TODO: logout // app_state: WriteSignal)>, LocalStorage>, client: Client, mut updates: Receiver, ) -> impl IntoView { provide_context(client); let roster = Store::new(Roster::new()); provide_context(roster); let message_subscriptions = RwSignal::new(MessageSubscriptions::new()); provide_context(message_subscriptions); let messages_store: StateStore> = StateStore::new(); provide_context(messages_store); let chats_store: StateStore> = StateStore::new(); provide_context(chats_store); let users_store: StateStore> = StateStore::new(); provide_context(users_store); let open_chats = Store::new(OpenChatsPanel::default()); provide_context(open_chats); // TODO: get cached contacts on login before getting the updated contacts OnceResource::new(async move { while let Some(update) = updates.recv().await { match update { UpdateMessage::Online(online, items) => { let contacts = items.into_iter().map(|(contact, user)| { (contact.user_jid.clone(), MacawContact::got_contact_and_user(contact, user)) }).collect(); roster.contacts().set(contacts); }, UpdateMessage::Offline(offline) => {}, UpdateMessage::RosterUpdate(contact, user) => { roster.contacts().update(|roster| { if let Some(macaw_contact) = roster.get_mut(&contact.user_jid) { macaw_contact.set(contact); } else { let jid = contact.user_jid.clone(); let contact = MacawContact::got_contact_and_user(contact, user); roster.insert(jid, contact); } }); }, UpdateMessage::RosterDelete(jid) => { roster.contacts().update(|roster| { roster.remove(&jid); }); }, UpdateMessage::Presence { from, presence } => {}, UpdateMessage::Message { to, from, message } => { // debug!("before got message"); let new_message = MacawMessage::got_message_and_user(message, from); // debug!("after got message"); spawn_local(async move { message_subscriptions.write_untracked().broadcast(to, new_message).await }); // debug!("after set message"); }, UpdateMessage::MessageDelivery { id, chat, delivery } => { messages_store.modify(&id, |message| message.delivery().set(Some(delivery))); }, UpdateMessage::SubscriptionRequest(jid) => {}, UpdateMessage::NickChanged { jid, nick } => { users_store.modify(&jid, |user| user.update(|user| *&mut user.nick = nick.clone())); }, UpdateMessage::AvatarChanged { jid, id } => { users_store.modify(&jid, |user| *&mut user.write().avatar = id.clone()); }, } } }); view! { } } #[component] pub fn OpenChatsPanelView() -> impl IntoView { let open_chats: Store = use_context().expect("no open chats panel in context"); // TODO: tabs // view! { // {move || { // if open_chats.chats().read().len() > 1 { // Some( // view! { // // }, // ) // } else { // None // } // }} // } view! {
{move || { if let Some(open_chat) = open_chats.chat_view().get() { if let Some(open_chat) = open_chats.chats().read().get(&open_chat) { view! { }.into_any() } else { view! {}.into_any() } } else { view! {}.into_any() } }}
} } #[component] pub fn OpenChatView(chat: MacawChat) -> impl IntoView { let chat_chat = *chat.chat; let chat_jid = move || chat_chat.correspondent().get(); view! {
} } #[component] pub fn ChatViewHeader(chat: MacawChat) -> impl IntoView { let chat_user = *chat.user; let avatar = move || get_avatar(chat_user); let name = move || get_name(chat_user); let jid = move || chat_user.jid().read().to_string(); view! {
} } #[component] pub fn MessageHistoryBuffer(chat: MacawChat) -> impl IntoView { view! {} } #[component] pub fn Message(message: MacawMessage, major: bool, r#final: bool) -> impl IntoView { let message_message = *message.message; let message_user = *message.user; let avatar = move || get_avatar(message_user); let name = move || get_name(message_user); // TODO: chrono-humanize? // TODO: if final, show delivery not only on hover. if major { view! {
{name}
{move || message_message.timestamp().read().to_string()}
{move || message_message.body().read().body.clone()}
}.into_any() } else { view! {
{move || message_message.timestamp().read().to_string()}
{move || message_message.body().read().body.clone()}
}.into_any() } } #[component] pub fn ChatViewMessageComposer(chat: JID) -> impl IntoView { let message_input: NodeRef
= NodeRef::new(); // TODO: load last message draft let new_message = RwSignal::new("".to_string()); let client: Client = use_context().expect("no client in context"); let client = RwSignal::new(client); let send_message = Action::new(move |_| { let value = chat.clone(); async move { spawn_local(async move { match client.read_untracked().send_message(value, Body { body: new_message.get_untracked() }).await { Ok(_) => { new_message.set("".to_string()); }, Err(e) => tracing::error!("message send error: {}", e), }}) } }); let _focus = Effect::new(move |_| { if let Some(input) = message_input.get() { let _ = input.focus(); // input.style("height: 0"); // let height = input.scroll_height(); // input.style(format!("height: {}px", height)); } }); // TODO: placeholder view! {
} } // V has to be an arc signal #[derive(Debug)] struct ArcStateStore { store: Arc>>, } impl PartialEq for ArcStateStore { fn eq(&self, other: &Self) -> bool { Arc::ptr_eq(&self.store, &other.store) } } impl Clone for ArcStateStore { fn clone(&self) -> Self { Self { store: Arc::clone(&self.store) } } } impl Eq for ArcStateStore {} impl ArcStateStore { pub fn new() -> Self { Self { store: Arc::new(RwLock::new(HashMap::new())), } } } #[derive(Debug)] struct StateStore { inner: ArenaItem, S> } impl Dispose for StateStore { fn dispose(self) { self.inner.dispose() } } impl StateStore where K: Send + Sync + 'static, V: Send + Sync + 'static { pub fn new() -> Self { Self::new_with_storage() } } impl StateStore where K: 'static, V: 'static, S: Storage> { pub fn new_with_storage() -> Self { Self { inner: ArenaItem::new_with_storage(ArcStateStore::new()), } } } impl StateStore where K: 'static, V: 'static { pub fn new_local() -> Self { Self::new_with_storage() } } impl From> for StateStore { fn from(value: ArcStateStore) -> Self { Self { inner: ArenaItem::new_with_storage(value), } } } impl FromLocal> for StateStore { fn from_local(value: ArcStateStore) -> Self { Self { inner: ArenaItem::new_with_storage(value), } } } impl Copy for StateStore {} impl Clone for StateStore { fn clone(&self) -> Self { *self } } impl StateStore where K: Send + Sync + 'static, V: Send + Sync + 'static { pub fn store(&self, key: K, value: V) -> StateListener { { let store = self.inner.try_get_value().unwrap(); let mut store = store.store.write().unwrap(); debug!("got store"); debug!("got store 2"); if let Some((v, count)) = store.get_mut(&key) { *v = value.clone(); *count += 1; } else { store.insert(key.clone(), (value.clone(), 1)); } }; StateListener { value, cleaner: StateCleaner { key, state_store: self.clone() }, } } } impl StateStore where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static { pub fn update(&self, key: &K, value: V) { let store = self.inner.try_get_value().unwrap(); let mut store = store.store.write().unwrap(); if let Some((v, _)) = store.get_mut(key) { *v = value; } } pub fn modify(&self, key: &K, modify: impl Fn(&mut V)) { let store = self.inner.try_get_value().unwrap(); let mut store = store.store.write().unwrap(); if let Some((v, _)) = store.get_mut(key) { modify(v); } } fn remove(&self, key: &K) { let store = self.inner.try_get_value().unwrap(); let mut store = store.store.write().unwrap(); if let Some((_v, count)) = store.get_mut(key) { *count -= 1; if *count == 0 { store.remove(key); debug!("dropped item from store"); } } } } #[derive(Clone)] struct StateListener where K: Eq + std::hash::Hash + 'static + std::marker::Send + std::marker::Sync, V: 'static + std::marker::Send + std::marker::Sync { value: V, cleaner: StateCleaner } impl Deref for StateListener { type Target = V; fn deref(&self) -> &Self::Target { &self.value } } impl DerefMut for StateListener { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.value } } struct ArcStateCleaner { key: K, state_store: ArcStateStore, } struct StateCleaner where K: Eq + std::hash::Hash + Send + Sync + 'static, V: Send + Sync + 'static { key: K, state_store: StateStore, } impl Clone for StateCleaner where K: Eq + std::hash::Hash + Clone + Send + Sync, V: Send + Sync { fn clone(&self) -> Self { { let store = self.state_store.inner.try_get_value().unwrap(); let mut store = store.store.write().unwrap(); if let Some((_v, count)) = store.get_mut(&self.key) { *count += 1; } } Self { key: self.key.clone(), state_store: self.state_store.clone(), } } } impl Drop for StateCleaner { fn drop(&mut self) { self.state_store.remove(&self.key); } } #[derive(Clone)] struct MacawChat { chat: StateListener>, user: StateListener>, } impl MacawChat { fn got_chat_and_user(chat: Chat, user: User) -> Self { let chat_state_store: StateStore> = use_context().expect("no chat state store"); let user_state_store: StateStore> = use_context().expect("no user state store"); let user = user_state_store.store(user.jid.clone(), Store::new(user)); let chat = chat_state_store.store(chat.correspondent.clone(), Store::new(chat)); Self { chat, user, } } } impl Deref for MacawChat { type Target = StateListener>; fn deref(&self) -> &Self::Target { &self.chat } } impl DerefMut for MacawChat { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.chat } } #[derive(Clone)] struct MacawMessage { message: StateListener>, user: StateListener>, } impl MacawMessage { fn got_message_and_user(message: Message, user: User) -> Self { let message_state_store: StateStore> = use_context().expect("no message state store"); let user_state_store: StateStore> = use_context().expect("no user state store"); let message = message_state_store.store(message.id, Store::new(message)); let user = user_state_store.store(user.jid.clone(), Store::new(user)); Self { message, user, } } } impl Deref for MacawMessage { type Target = StateListener>; fn deref(&self) -> &Self::Target { &self.message } } impl DerefMut for MacawMessage { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.message } } struct MacawUser { user: StateListener>, } impl Deref for MacawUser { type Target = StateListener>; fn deref(&self) -> &Self::Target { &self.user } } impl DerefMut for MacawUser { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.user } } struct MacawContact { contact: Store, user: StateListener>, } impl MacawContact { fn got_contact_and_user(contact: Contact, user: User) -> Self { let contact = Store::new(contact); let user_state_store: StateStore> = use_context().expect("no user state store"); let user = user_state_store.store(user.jid.clone(), Store::new(user)); Self { contact, user, } } } impl Deref for MacawContact { type Target = Store; fn deref(&self) -> &Self::Target { &self.contact } } impl DerefMut for MacawContact { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.contact } } #[component] fn ChatsList() -> impl IntoView { let (chats, set_chats) = signal(IndexMap::new()); let load_chats = LocalResource::new(move || async move { let client = use_context::().expect("client not in context"); let chats = client.get_chats_ordered_with_latest_messages_and_users().await.map_err(|e| e.to_string()); match chats { Ok(c) => { let chats = c.into_iter().map(|((chat, chat_user), (message, message_user))| { (chat.correspondent.clone(), (MacawChat::got_chat_and_user(chat, chat_user), MacawMessage::got_message_and_user(message, message_user))) }).collect::>(); set_chats.set(chats); }, Err(_) => { // TODO: show error message at top of chats list }, } }); // TODO: filter new messages signal let new_messages_signal: RwSignal = use_context().unwrap(); let _load_new_messages = LocalResource::new(move || async move { load_chats.await; let mut new_messages = new_messages_signal.write().subscribe_all(); while let Some((to, new_message)) = new_messages.recv().await { debug!("got new message in let"); let mut chats = set_chats.write(); if let Some((chat, _latest_message)) = chats.shift_remove(&to) { debug!("chat existed"); debug!("new message: {}", new_message.read().body.body); chats.insert_before(0, to, (chat.clone(), new_message)); debug!("done setting"); } else { debug!("the chat didn't exist"); let client = use_context::().expect("client not in context"); let chat = client.get_chat(to.clone()).await.unwrap(); let user = client.get_user(to.clone()).await.unwrap(); debug!("before got chat"); let chat = MacawChat::got_chat_and_user(chat, user); debug!("after got chat"); chats.insert_before(0, to, (chat, new_message)); debug!("done setting"); } } debug!("set the new message"); }); view! {

Chats

} } pub fn get_avatar(user: Store) -> String { if let Some(avatar) = &user.read().avatar { NO_AVATAR.to_string() // TODO: enable avatar fetching // format!("/files/{}", avatar) } else { NO_AVATAR.to_string() } } pub fn get_name(user: Store) -> String { let roster: Store = use_context().expect("no roster in context"); if let Some(name) = roster .contacts() .read() .get(&user.read().jid) .map(|contact| contact.read().name.clone()) .unwrap_or_default() { name.to_string() } else if let Some(nick) = &user.read().nick { nick.to_string() } else { user.read().jid.to_string() } } #[component] fn ChatsListItem(chat: MacawChat, message: MacawMessage) -> impl IntoView { let chat_user = *chat.user; let avatar = move || get_avatar(chat_user); let name = move || get_name(chat_user); // TODO: store fine-grained reactivity let latest_message_body = move || message.get().body.body; let open_chats: Store = use_context().expect("no open chats panel store in context"); let open_chat = move |_| { debug!("opening chat"); open_chats.update(|open_chats| open_chats.open(chat.clone())); }; view! {

{name}

{latest_message_body}

} }