diff options
author | 2025-04-08 10:38:18 +0100 | |
---|---|---|
committer | 2025-04-08 10:38:18 +0100 | |
commit | 5b644e2dc8712d56931b410b9c46dae1ef36e691 (patch) | |
tree | 2290b3ab2a5829c3b8406ce073a95474e146a680 /filamento/src/logic/process_stanza.rs | |
parent | c24541b129a14a598afe00ed4244d49d27513310 (diff) | |
download | luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.tar.gz luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.tar.bz2 luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.zip |
feat(filamento): user avatar publishing and processing
Diffstat (limited to '')
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 422 |
1 files changed, 354 insertions, 68 deletions
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 11d7588..c383d70 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -1,7 +1,9 @@ use std::str::FromStr; +use base64::{Engine, prelude::BASE64_STANDARD}; use chrono::Utc; use lampada::{Connected, SupervisorSender}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ Stanza, @@ -17,14 +19,22 @@ use uuid::Uuid; use crate::{ UpdateMessage, caps, chat::{Body, Message}, - error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError}, + error::{ + AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, + RosterError, + }, + files::FileStore, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, }; use super::ClientLogic; -pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +pub async fn handle_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + stanza: Stanza, + connection: Connected, +) { let result = process_stanza(logic.clone(), stanza, connection).await; match result { Ok(u) => match u { @@ -38,10 +48,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne } } -pub async fn recv_message( - logic: ClientLogic, +pub async fn recv_message<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza_message: stanza::client::message::Message, -) -> Result<Option<UpdateMessage>, MessageRecvError> { +) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> { if let Some(from) = stanza_message.from { // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. let timestamp = stanza_message @@ -50,6 +60,7 @@ pub async fn recv_message( .unwrap_or_else(|| Utc::now()); // TODO: group chat messages + // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state. // if there is a body, should create chat message if let Some(body) = stanza_message.body { let message = Message { @@ -67,16 +78,28 @@ pub async fn recv_message( }; // save the message to the database - logic.db().upsert_chat_and_user(&from).await?; - if let Err(e) = logic - .db() - .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) - .await; - } + match logic.db().upsert_chat_and_user(&from).await { + Ok(_) => { + if let Err(e) = logic + .db() + .create_message_with_user_resource( + message.clone(), + from.clone(), + from.clone(), + ) + .await + { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + } + }; // update the client with the new message logic @@ -89,70 +112,333 @@ pub async fn recv_message( } if let Some(nick) = stanza_message.nick { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) - .await; + let nick = nick.0; + if nick.is_empty() { + match logic.db().delete_user_nick(from.as_bare()).await { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick(from.as_bare(), nick.clone()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } } - - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0, - }) - .await; } if let Some(event) = stanza_message.event { match event { - Event::Items(items) => match items.node.as_str() { - "http://jabber.org/protocol/nick" => match items.items { - ItemsType::Item(items) => { - if let Some(item) = items.first() { - match &item.item { - Some(c) => match c { - Content::Nick(nick) => { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv( - MessageRecvError::NickUpdate(e), - )) - .await; + Event::Items(items) => { + match items.node.as_str() { + "http://jabber.org/protocol/nick" => match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + match &item.item { + Some(c) => match c { + Content::Nick(nick) => { + let nick = nick.0.clone(); + if nick.is_empty() { + match logic + .db() + .delete_user_nick(from.as_bare()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick( + from.as_bare(), + nick.clone(), + ) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + } } + _ => {} + }, + None => {} + } + } + } + _ => {} + }, + "urn:xmpp:avatar:metadata" => { + match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + match &item.item { + Some(Content::AvatarMetadata(metadata)) => { + // check if user avatar has been deleted + if let Some(metadata) = metadata + .info + .iter() + .find(|info| info.url.is_none()) + { + // check if user avatar has changed + match logic + .db() + .upsert_user_avatar( + from.as_bare(), + metadata.id.clone(), + ) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0.clone(), - }) - .await; + match logic + .file_store() + .is_stored(&metadata.id) + .await + .map_err(|err| { + AvatarUpdateError::<Fs>::FileStore( + err, + ) + }) { + Ok(false) => { + // get data + let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?; + match pep_item { + crate::pep::Item::AvatarData(data) => { + let data = data.map(|data| data.data_b64).unwrap_or_default(); + // TODO: these should all be in a separate avatarupdate function + match BASE64_STANDARD.decode(data) { + Ok(data) => { + let mut hasher = Sha1::new(); + hasher.update(&data); + let received_data_hash = BASE64_STANDARD.encode(hasher.finalize()); + if received_data_hash == metadata.id { + if let Err(e) = logic.file_store().store(&received_data_hash, &data).await { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await; + } + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + }, + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await; + }, + } + }, + _ => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await; + } + } + } + Ok(true) => { + // just send the update + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await; + } + } + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } else { + // delete avatar + match logic + .db() + .delete_user_avatar(from.as_bare()) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: None, + }, + ) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } + // check if the new file is in the file store + // if not, retrieve from server and save in the file store (remember to check if the hash matches) + // send the avatar update + } + _ => {} } - Content::Unknown(element) => {} - }, - None => {} + } } + _ => {} } } - ItemsType::Retract(retracts) => {} - }, - _ => {} - }, + _ => {} + } + } // Event::Collection(collection) => todo!(), // Event::Configuration(configuration) => todo!(), // Event::Delete(delete) => todo!(), // Event::Purge(purge) => todo!(), // Event::Subscription(subscription) => todo!(), - _ => {} + _ => {} // TODO: catch these catch-alls in some way } } @@ -240,8 +526,8 @@ pub async fn recv_presence( } } -pub async fn recv_iq( - logic: ClientLogic, +pub async fn recv_iq<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, connection: Connected, iq: Iq, ) -> Result<Option<UpdateMessage>, IqError> { @@ -469,11 +755,11 @@ pub async fn recv_iq( } } -pub async fn process_stanza( - logic: ClientLogic, +pub async fn process_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza: Stanza, connection: Connected, -) -> Result<Option<UpdateMessage>, Error> { +) -> Result<Option<UpdateMessage>, Error<Fs>> { let update = match stanza { Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), Stanza::Presence(presence) => Ok(recv_presence(presence).await?), |