From 91f1994af940085d5d475a97820900ebbf0eb553 Mon Sep 17 00:00:00 2001 From: cel 🌸 Date: Thu, 3 Apr 2025 03:41:38 +0100 Subject: feat: better message handling, pep publish, xep_0172: nick --- filamento/src/logic/mod.rs | 14 +- filamento/src/logic/offline.rs | 64 +++++- filamento/src/logic/online.rs | 355 +++++++++++++++++++++------------- filamento/src/logic/process_stanza.rs | 86 +++++--- 4 files changed, 348 insertions(+), 171 deletions(-) (limited to 'filamento/src/logic') diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 15c2d12..1ddd7d3 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use jid::JID; use lampada::{Connected, Logic, error::ReadError}; use stanza::client::Stanza; use tokio::sync::{Mutex, mpsc, oneshot}; @@ -8,7 +9,7 @@ use tracing::{error, info, warn}; use crate::{ Client, Command, UpdateMessage, db::Db, - error::{Error, RequestError, ResponseError}, + error::{Error, IqRequestError, ResponseError}, }; mod abort; @@ -23,6 +24,7 @@ mod process_stanza; #[derive(Clone)] pub struct ClientLogic { client: Client, + bare_jid: JID, db: Db, pending: Pending, update_sender: mpsc::Sender, @@ -41,7 +43,7 @@ impl Pending { connection: &Connected, request: Stanza, id: String, - ) -> Result { + ) -> Result { let (send, recv) = oneshot::channel(); { self.0.lock().await.insert(id, send); @@ -74,12 +76,18 @@ impl Pending { } impl ClientLogic { - pub fn new(client: Client, db: Db, update_sender: mpsc::Sender) -> Self { + pub fn new( + client: Client, + bare_jid: JID, + db: Db, + update_sender: mpsc::Sender, + ) -> Self { Self { db, pending: Pending::new(), update_sender, client, + bare_jid, } } diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index bc2666a..6399cf7 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -1,8 +1,14 @@ +use chrono::Utc; use lampada::error::WriteError; +use uuid::Uuid; use crate::{ Command, - error::{DatabaseError, DiscoError, Error, RosterError, StatusError}, + chat::{Delivery, Message}, + error::{ + DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError, + StatusError, + }, presence::Online, roster::Contact, }; @@ -76,16 +82,16 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res sender.send(Err(RosterError::Write(WriteError::Disconnected))); } Command::BuddyRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); + sender.send(Err(WriteError::Disconnected.into())); } Command::SubscriptionRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); + sender.send(Err(WriteError::Disconnected.into())); } Command::AcceptBuddyRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); + sender.send(Err(WriteError::Disconnected.into())); } Command::AcceptSubscriptionRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); + sender.send(Err(WriteError::Disconnected.into())); } Command::UnsubscribeFromContact(_jid, sender) => { sender.send(Err(WriteError::Disconnected)); @@ -107,8 +113,42 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res sender.send(result); } // TODO: offline message queue - Command::SendMessage(_jid, _body, sender) => { - sender.send(Err(WriteError::Disconnected)); + Command::SendMessage(jid, body) => { + let id = Uuid::new_v4(); + let timestamp = Utc::now(); + + let message = Message { + id, + from: logic.bare_jid.clone(), + // TODO: failure reason + delivery: Some(Delivery::Failed), + timestamp, + body, + }; + // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send + // TODO: mark these as potentially failed upon client launch + if let Err(e) = logic + .db() + .create_message_with_self_resource( + message.clone(), + jid.clone(), + // TODO: when message is queued and sent, the from must also be updated with the correct resource + logic.bare_jid.clone(), + ) + .await + { + // TODO: should these really be handle_error or just the error macro? + logic + .handle_error(MessageSendError::MessageHistory(e.into()).into()) + .await; + } + logic + .update_sender() + .send(crate::UpdateMessage::Message { + to: jid.as_bare(), + message, + }) + .await; } Command::SendPresence(_jid, _presence, sender) => { sender.send(Err(WriteError::Disconnected)); @@ -119,6 +159,16 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res Command::DiscoItems(_jid, _node, sender) => { sender.send(Err(DiscoError::Write(WriteError::Disconnected))); } + Command::Publish { + item: _, + node: _, + sender, + } => { + sender.send(Err(IqRequestError::Write(WriteError::Disconnected).into())); + } + Command::ChangeNick(_, sender) => { + sender.send(Err(NickError::Disconnected)); + } } Ok(()) } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index 63a4aa3..d32f527 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -3,10 +3,11 @@ use jid::JID; use lampada::{Connected, WriteMessage, error::WriteError}; use stanza::{ client::{ - Stanza, - iq::{self, Iq, IqType, Query}, + iq::{self, Iq, IqType, Query}, Stanza }, xep_0030::{info, items}, + xep_0060::pubsub::{self, Pubsub}, + xep_0172::{self, Nick}, xep_0203::Delay, }; use tokio::sync::oneshot; @@ -14,12 +15,9 @@ use tracing::{debug, error, info}; use uuid::Uuid; use crate::{ - Command, UpdateMessage, - chat::{Body, Message}, - disco::{Info, Items}, - error::{DatabaseError, DiscoError, Error, MessageSendError, RosterError, StatusError}, - presence::{Online, Presence, PresenceType}, - roster::{Contact, ContactUpdate}, + chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ + DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError + }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage }; use super::{ @@ -156,105 +154,82 @@ pub async fn handle_add_contact( } } -pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> { +pub async fn handle_buddy_request( + logic: &ClientLogic, + connection: Connected, + jid: JID, +) -> Result<(), SubscribeError> { + let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; + let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + nick, + ..Default::default() }); connection.write_handle().write(presence).await?; let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_subscription_request( + logic: &ClientLogic, connection: Connected, jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> { // TODO: i should probably have builders + let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; + let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + nick, + ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_accept_buddy_request( + logic: &ClientLogic, connection: Connected, jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> { let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + ..Default::default() }); connection.write_handle().write(presence).await?; + let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; + let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + nick, + ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) } pub async fn handle_accept_subscription_request( + logic: &ClientLogic, connection: Connected, jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> { + let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; + let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + nick, + ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) @@ -265,16 +240,9 @@ pub async fn handle_unsubscribe_from_contact( jid: JID, ) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) @@ -282,16 +250,9 @@ pub async fn handle_unsubscribe_from_contact( pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) @@ -299,29 +260,15 @@ pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Resu pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + ..Default::default() }); connection.write_handle().write(presence).await?; let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, + ..Default::default() }); connection.write_handle().write(presence).await?; Ok(()) @@ -464,60 +411,119 @@ pub async fn handle_set_status( Ok(()) } -pub async fn handle_send_message( - logic: &ClientLogic, - connection: Connected, - jid: JID, - body: Body, -) -> Result<(), WriteError> { +pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { + // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. + let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + + let nick; + let mark_chat_as_chatted; + if have_chatted == false { + match logic.db.read_user(logic.bare_jid.clone()).await { + Ok(u) => { + nick = u.nick.map(|nick| Nick(nick)); + mark_chat_as_chatted = true; + } + Err(e) => { + logic + .handle_error(MessageSendError::GetUserDetails(e.into()).into()) + .await; + nick = None; + mark_chat_as_chatted = false; + } + } + } else { + nick = None; + mark_chat_as_chatted = false; + } + + // generate message struct let id = Uuid::new_v4(); let timestamp = Utc::now(); - let message = Stanza::Message(stanza::client::message::Message { + let message = Message { + id, + from: connection.jid().as_bare(), + body: body.clone(), + timestamp, + delivery: Some(Delivery::Sending), + }; + + // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send + // TODO: mark these as potentially failed upon client launch + if let Err(e) = logic + .db() + .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone()) + .await + { + // TODO: should these really be handle_error or just the error macro? + logic + .handle_error(MessageSendError::MessageHistory(e.into()).into()) + .await; + } + + // tell the client a message is being sent + logic + .update_sender() + .send(UpdateMessage::Message { + to: jid.as_bare(), + message, + }) + .await; + + // prepare the message stanza + let message_stanza = Stanza::Message(stanza::client::message::Message { from: Some(connection.jid().clone()), id: Some(id.to_string()), to: Some(jid.clone()), // TODO: specify message type r#type: stanza::client::message::MessageType::Chat, // TODO: lang ? - lang: None, - subject: None, body: Some(stanza::client::message::Body { lang: None, body: Some(body.body.clone()), }), - thread: None, // include delay to have a consistent timestamp between server and client delay: Some(Delay { from: None, stamp: timestamp, }), + nick, + ..Default::default() }); - connection.write_handle().write(message).await?; - let mut message = Message { - id, - from: connection.jid().clone(), - body, - timestamp, - }; - info!("sent message: {:?}", message); - if let Err(e) = logic - .db() - .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) - .await - { - // TODO: should these really be handle_error or just the error macro? - logic - .handle_error(MessageSendError::MessageHistory(e.into()).into()) - .await; - } - // TODO: don't do this, have separate from from details - message.from = message.from.as_bare(); - let _ = logic - .update_sender() - .send(UpdateMessage::Message { to: jid, message }) + + // send the message + let result = connection + .write_handle() + .write(message_stanza.clone()) .await; - Ok(()) - // TODO: refactor this to send a sending updatemessage, then update or something like that + match result { + Ok(_) => { + info!("sent message: {:?}", message_stanza); + logic + .update_sender() + .send(UpdateMessage::MessageDelivery { + id, + delivery: Delivery::Written, + }) + .await; + if mark_chat_as_chatted { + if let Err(e) = logic.db.mark_chat_as_chatted(jid).await { + logic + .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) + .await; + } + } + } + Err(e) => { + logic + .update_sender() + .send(UpdateMessage::MessageDelivery { + id, + delivery: Delivery::Failed, + }) + .await; + logic.handle_error(MessageSendError::Write(e).into()).await; + } + } } pub async fn handle_send_presence( @@ -673,6 +679,78 @@ pub async fn handle_disco_items( } } +pub async fn handle_publish( + logic: &ClientLogic, + connection: Connected, + item: pep::Item, + node: String, +) -> Result<(), PublishError> { + let id = Uuid::new_v4().to_string(); + let publish = match item { + pep::Item::Nick(n) => pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick(n))), + ..Default::default() + }], + }, + }; + let request = Iq { + from: Some(connection.jid().clone()), + id: id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(Query::Pubsub(Pubsub::Publish(publish, None))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == None || + from == Some(connection.jid().as_bare()) + { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::Pubsub(_) => Ok(()), + q => Err(PublishError::MismatchedQuery(q)), + } + } else { + Err(PublishError::MissingQuery) + } + } + IqType::Error => { + Err(PublishError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + Err(PublishError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().as_bare()), + )) + } + } + s => Err(PublishError::UnexpectedStanza(s)), + } +} + +pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { + logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?; + Ok(()) +} + // TODO: could probably macro-ise? pub async fn handle_online_result( logic: &ClientLogic, @@ -716,25 +794,24 @@ pub async fn handle_online_result( let user = handle_get_user(logic, jid).await; let _ = sender.send(user); } - // TODO: offline queue to modify roster Command::AddContact(jid, sender) => { let result = handle_add_contact(logic, connection, jid).await; let _ = sender.send(result); } Command::BuddyRequest(jid, sender) => { - let result = handle_buddy_request(connection, jid).await; + let result = handle_buddy_request(logic, connection, jid).await; let _ = sender.send(result); } Command::SubscriptionRequest(jid, sender) => { - let result = handle_subscription_request(connection, jid).await; + let result = handle_subscription_request(logic, connection, jid).await; let _ = sender.send(result); } Command::AcceptBuddyRequest(jid, sender) => { - let result = handle_accept_buddy_request(connection, jid).await; + let result = handle_accept_buddy_request(logic, connection, jid).await; let _ = sender.send(result); } Command::AcceptSubscriptionRequest(jid, sender) => { - let result = handle_accept_subscription_request(connection, jid).await; + let result = handle_accept_subscription_request(logic, connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeFromContact(jid, sender) => { @@ -761,10 +838,8 @@ pub async fn handle_online_result( let result = handle_set_status(logic, connection, online).await; let _ = sender.send(result); } - // TODO: offline message queue - Command::SendMessage(jid, body, sender) => { - let result = handle_send_message(logic, connection, jid, body).await; - let _ = sender.send(result); + Command::SendMessage(jid, body) => { + handle_send_message(logic, connection, jid, body).await; } Command::SendPresence(jid, presence, sender) => { let result = handle_send_presence(connection, jid, presence).await; @@ -778,6 +853,14 @@ pub async fn handle_online_result( let result = handle_disco_items(logic, connection, jid, node).await; let _ = sender.send(result); } + Command::Publish { item, node, sender } => { + let result = handle_publish(logic, connection, item, node).await; + let _ = sender.send(result); + } + Command::ChangeNick(nick, sender) => { + let result = handle_change_nick(logic, nick).await; + let _ = sender.send(result); + } } Ok(()) } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index b1bc830..2f6644e 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -41,38 +41,74 @@ pub async fn recv_message( logic: ClientLogic, stanza_message: stanza::client::message::Message, ) -> Result, MessageRecvError> { - if let Some(mut from) = stanza_message.from { + if let Some(from) = stanza_message.from { // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. let timestamp = stanza_message .delay .map(|delay| delay.stamp) .unwrap_or_else(|| Utc::now()); // TODO: group chat messages - let mut message = Message { - id: stanza_message - .id - // TODO: proper id storage - .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) - .unwrap_or_else(|| Uuid::new_v4()), - from: from.clone(), - timestamp, - body: Body { - // TODO: should this be an option? - body: stanza_message - .body - .map(|body| body.body) - .unwrap_or_default() - .unwrap_or_default(), - }, - }; + + // if there is a body, should create chat message + if let Some(body) = stanza_message.body { + let message = Message { + id: stanza_message + .id + // TODO: proper id xep + .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) + .unwrap_or_else(|| Uuid::new_v4()), + from: from.as_bare(), + timestamp, + body: Body { + body: body.body.unwrap_or_default(), + }, + delivery: None, + }; + + // save the message to the database + logic.db().upsert_chat_and_user(&from).await?; + if let Err(e) = logic + .db() + .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) + .await + { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + } + + // update the client with the new message + logic + .update_sender() + .send(UpdateMessage::Message { + to: from.as_bare(), + message, + }) + .await; + } + + if let Some(nick) = stanza_message.nick { + if let Err(e) = logic + .db() + .upsert_user_nick(from.as_bare(), nick.0.clone()) + .await + { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + } + + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: nick.0, + }) + .await; + } + + Ok(None) // TODO: can this be more efficient? - logic - .db() - .create_message_with_user_resource_and_chat(message.clone(), from.clone()) - .await?; - message.from = message.from.as_bare(); - from = from.as_bare(); - Ok(Some(UpdateMessage::Message { to: from, message })) } else { Err(MessageRecvError::MissingFrom) } -- cgit