use std::{ops::{Deref, DerefMut}, str::FromStr, sync::Arc, thread::sleep, time::{self, Duration}}; use filamento::{chat::{Chat, Message}, db::Db, error::{CommandError, ConnectionError, DatabaseError}, files::FilesMem, user::User, UpdateMessage}; use indexmap::IndexMap; use jid::JID; use leptos::{prelude::*, task::spawn_local}; use futures::stream::StreamExt; use leptos_meta::Stylesheet; use leptos_query::{create_query, provide_query_client, provide_query_client_with_options, DefaultQueryOptions, QueryOptions, QueryResult, QueryScope}; use leptos_reactive::{SignalGetUntracked, SignalStream}; use stylance::import_style; use thiserror::Error; use tokio::{sync::mpsc::Receiver}; use tracing::debug; use uuid::Uuid; pub enum AppState { LoggedOut, LoggedIn, } #[derive(Clone)] pub struct Client { client: filamento::Client, jid: Arc, file_store: FilesMem, } impl Deref for Client { type Target = filamento::Client; fn deref(&self) -> &Self::Target { &self.client } } impl DerefMut for Client { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.client } } #[component] pub fn App() -> impl IntoView { let (app, set_app) = signal(AppState::LoggedOut); let client: RwSignal)>, LocalStorage> = RwSignal::new_local(None); view! { {move || match &*app.read() { AppState::LoggedOut => view! { }.into_any(), AppState::LoggedIn => { if let Some((client, updates)) = client.write_untracked().take() { view! { }.into_any() } else { set_app.set(AppState::LoggedOut); view! { }.into_any() } } }} } } #[derive(Clone, Debug, Error)] pub enum LoginError { #[error("Missing Password")] MissingPassword, #[error("Missing JID")] MissingJID, #[error("Invalid JID: {0}")] InvalidJID(#[from] jid::ParseError), #[error("Connection Error: {0}")] ConnectionError(#[from] CommandError), } #[component] fn LoginPage(set_app: WriteSignal, set_client: RwSignal)>, LocalStorage>) -> impl IntoView { let jid = RwSignal::new("".to_string()); let password = RwSignal::new("".to_string()); let remember_me = RwSignal::new(false); let connect_on_login = RwSignal::new(true); let (error, set_error) = signal_local(None::); let error_message = move || { error.with(|error| { if let Some(error) = error { view! {
{error.to_string()}
}.into_any() } else { view! {}.into_any() } }) }; let (login_pending, set_login_pending) = signal(false); let login = Action::new_local(move |_| { async move { set_login_pending.set(true); if jid.read_untracked().is_empty() { set_error.set(Some(LoginError::MissingJID)); set_login_pending.set(false); return } if password.read_untracked().is_empty() { set_error.set(Some(LoginError::MissingPassword)); set_login_pending.set(false); return } let jid = match JID::from_str(&jid.read_untracked()) { Ok(j) => j, Err(e) => { set_error.set(Some(e.into())); set_login_pending.set(false); return }, }; // initialise the client let db = Db::create_connect_and_migrate("mem.db").await.unwrap(); let files_mem = FilesMem::new(); let (client, updates) = filamento::Client::new(jid.clone(), password.read_untracked().clone(), db, files_mem.clone()); // TODO: remember_me let client = Client { client, jid: Arc::new(jid), file_store: files_mem, }; // TODO: connect on login if *connect_on_login.read_untracked() { match client.connect().await { Ok(_) => {}, Err(e) => { set_error.set(Some(e.into())); set_login_pending.set(false); return }, } } // debug!("before setting app state"); set_client.set(Some((client, updates))); set_app.set(AppState::LoggedIn); } }); view! {

Macaw Instant Messenger

{error_message}
} } #[component] fn Macaw( // TODO: logout // app_state: WriteSignal)>, LocalStorage>, client: Client, mut updates: Receiver, ) -> impl IntoView { // TODO: is there some kind of context_local? let client = RwSignal::new_local(client); provide_context(client); let (new_messages, set_new_messages) = signal(None::<(JID, MacawMessage)>); provide_context(new_messages); provide_query_client_with_options(DefaultQueryOptions { resource_option: leptos_query::ResourceOption::Local, ..Default::default() }); let updates_routine = OnceResource::new(async move { while let Some(update) = updates.recv().await { match update { UpdateMessage::Online(online, items) => {}, UpdateMessage::Offline(offline) => {}, UpdateMessage::RosterUpdate(contact, user) => {}, UpdateMessage::RosterDelete(jid) => {}, UpdateMessage::Presence { from, presence } => {}, UpdateMessage::Message { to, from, message } => { let new_message = MacawMessage::got_message_and_user(message, from); set_new_messages.set(Some((to, new_message))); }, UpdateMessage::MessageDelivery { id, chat, delivery } => {}, UpdateMessage::SubscriptionRequest(jid) => {}, UpdateMessage::NickChanged { jid, nick } => {}, UpdateMessage::AvatarChanged { jid, id } => {}, } } }); view! { "logged in" } } fn chat_query() -> QueryScope> { create_query(get_chat, QueryOptions::default()) } async fn get_chat(jid: JID) -> Result { let client: Client = use_context::>().unwrap().read_untracked().clone(); client.get_chat(jid).await.map_err(|e| e.to_string()) } fn user_query() -> QueryScope> { create_query(get_user, QueryOptions::default()) } async fn get_user(jid: JID) -> Result { let client: Client = use_context::>().unwrap().read_untracked().clone(); client.get_user(jid).await.map_err(|e| e.to_string()) } fn message_query() -> QueryScope> { create_query(get_message, QueryOptions::default()) } async fn get_message(id: Uuid) -> Result { let client: Client = use_context::>().unwrap().read_untracked().clone(); client.get_message(id).await.map_err(|e| e.to_string()) } // fn got_chat(chat: Chat) -> QueryScope { // let fetcher = move |_| { // let chat = (&chat).clone(); // async { // MacawChat { // chat // } // }}; // create_query(fetcher, QueryOptions::default()) // } #[derive(Clone)] struct MacawChat { chat: ReadSignal>>>, user: ReadSignal>>>, } impl MacawChat { fn got_chat_and_user(chat: Chat, user: User) -> Self { let correspondent = chat.correspondent.clone(); let chat_query = chat_query(); chat_query.set_query_data(correspondent.clone(), Ok(chat)); let chat = chat_query.use_query(move || correspondent.clone()); let jid = user.jid.clone(); let user_query = user_query(); user_query.set_query_data(jid.clone(), Ok(user)); let user = user_query.use_query(move || jid.clone()); Self { chat: ReadSignal::from_stream_unsync(chat.data.to_stream()), user: ReadSignal::from_stream_unsync(user.data.to_stream()), } } fn get(jid: JID) -> Self { let jid1 = jid.clone(); let chat = chat_query().use_query(move || (&jid1).clone()); let user = user_query().use_query(move || (&jid).clone()); Self { chat: ReadSignal::from_stream_unsync(chat.data.to_stream()), user: ReadSignal::from_stream_unsync(user.data.to_stream()), } } } impl Deref for MacawChat { type Target = ReadSignal>>>; fn deref(&self) -> &Self::Target { &self.chat } } impl DerefMut for MacawChat { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.chat } } #[derive(Clone)] struct MacawMessage { message: ReadSignal>>>, user: ReadSignal>>>, } impl MacawMessage { fn got_message_and_user(message: Message, user: User) -> Self { debug!("executing the got message"); let message_id = message.id; let message_query = message_query(); message_query.set_query_data(message.id, Ok(message)); let message = message_query.use_query(move || message_id); let jid = user.jid.clone(); let user_query = user_query(); user_query.set_query_data(jid.clone(), Ok(user)); let user = user_query.use_query(move || jid.clone()); Self { message: ReadSignal::from_stream_unsync(message.data.to_stream()), user: ReadSignal::from_stream_unsync(user.data.to_stream()), } } } impl Deref for MacawMessage { type Target = ReadSignal>>>; fn deref(&self) -> &Self::Target { &self.message } } impl DerefMut for MacawMessage { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.message } } fn ChatsList() -> impl IntoView { let chats: LocalResource, LocalStorage>, WriteSignal, LocalStorage>), String>> = LocalResource::new(move || async move || -> Result<_, _> { let client: Client = use_context::>().unwrap().read().clone(); let chats = client.get_chats_ordered_with_latest_messages_and_users().await.map_err(|e| e.to_string())?; let chats = chats.into_iter().map(|((chat, chat_user), (message, message_user))| { (chat.correspondent.clone(), (MacawChat::got_chat_and_user(chat, chat_user), MacawMessage::got_message_and_user(message, message_user))) }).collect::>(); let (chats, set_chats) = signal_local(chats); Ok((chats, set_chats)) }()); // TODO: filter new messages signal let new_messages_signal: ReadSignal> = use_context().unwrap(); OnceResource::new(async move { let mut new_message = new_messages_signal.to_stream(); match chats.await { Ok((c, set_c)) => { while let Some(new_message) = new_message.next().await { debug!("got new message in let"); if let Some((to, new_message)) = new_message { if let Some((chat, _latest_message)) = set_c.write().shift_remove(&to) { debug!("chat existed"); set_c.write().insert_before(0, to, (chat, new_message)); debug!("done setting"); } else { debug!("the chat didn't exist"); let chat = MacawChat::get(to.clone()); set_c.write().insert_before(0, to, (chat, new_message)); debug!("done setting"); } } } debug!("set the new message"); } Err(_) => {}, } }); view! {

Chats

{move || { if let Some(chats) = &*chats.read() { match &**chats { Ok((chats, _)) => { let chats = chats.clone(); view! {

{chat.0.to_string()}

} .into_any() } Err(e) => { view! {
{format!("{}", e)}
}.into_any() } } } else { None::.into_any() } }}
} }