aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/process_stanza.rs
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-04-08 10:38:18 +0100
committerLibravatar cel 🌸 <cel@bunny.garden>2025-04-08 10:38:18 +0100
commit5b644e2dc8712d56931b410b9c46dae1ef36e691 (patch)
tree2290b3ab2a5829c3b8406ce073a95474e146a680 /filamento/src/logic/process_stanza.rs
parentc24541b129a14a598afe00ed4244d49d27513310 (diff)
downloadluz-5b644e2dc8712d56931b410b9c46dae1ef36e691.tar.gz
luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.tar.bz2
luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.zip
feat(filamento): user avatar publishing and processing
Diffstat (limited to 'filamento/src/logic/process_stanza.rs')
-rw-r--r--filamento/src/logic/process_stanza.rs422
1 files changed, 354 insertions, 68 deletions
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 11d7588..c383d70 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -1,7 +1,9 @@
use std::str::FromStr;
+use base64::{Engine, prelude::BASE64_STANDARD};
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
+use sha1::{Digest, Sha1};
use stanza::{
client::{
Stanza,
@@ -17,14 +19,22 @@ use uuid::Uuid;
use crate::{
UpdateMessage, caps,
chat::{Body, Message},
- error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError},
+ error::{
+ AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
+ RosterError,
+ },
+ files::FileStore,
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
};
use super::ClientLogic;
-pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) {
+pub async fn handle_stanza<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
+ stanza: Stanza,
+ connection: Connected,
+) {
let result = process_stanza(logic.clone(), stanza, connection).await;
match result {
Ok(u) => match u {
@@ -38,10 +48,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne
}
}
-pub async fn recv_message(
- logic: ClientLogic,
+pub async fn recv_message<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
stanza_message: stanza::client::message::Message,
-) -> Result<Option<UpdateMessage>, MessageRecvError> {
+) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> {
if let Some(from) = stanza_message.from {
// TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
let timestamp = stanza_message
@@ -50,6 +60,7 @@ pub async fn recv_message(
.unwrap_or_else(|| Utc::now());
// TODO: group chat messages
+ // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state.
// if there is a body, should create chat message
if let Some(body) = stanza_message.body {
let message = Message {
@@ -67,16 +78,28 @@ pub async fn recv_message(
};
// save the message to the database
- logic.db().upsert_chat_and_user(&from).await?;
- if let Err(e) = logic
- .db()
- .create_message_with_user_resource(message.clone(), from.clone(), from.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
- .await;
- }
+ match logic.db().upsert_chat_and_user(&from).await {
+ Ok(_) => {
+ if let Err(e) = logic
+ .db()
+ .create_message_with_user_resource(
+ message.clone(),
+ from.clone(),
+ from.clone(),
+ )
+ .await
+ {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ }
+ };
// update the client with the new message
logic
@@ -89,70 +112,333 @@ pub async fn recv_message(
}
if let Some(nick) = stanza_message.nick {
- if let Err(e) = logic
- .db()
- .upsert_user_nick(from.as_bare(), nick.0.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
- .await;
+ let nick = nick.0;
+ if nick.is_empty() {
+ match logic.db().delete_user_nick(from.as_bare()).await {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ } else {
+ match logic
+ .db()
+ .upsert_user_nick(from.as_bare(), nick.clone())
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
}
-
- logic
- .update_sender()
- .send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
- nick: nick.0,
- })
- .await;
}
if let Some(event) = stanza_message.event {
match event {
- Event::Items(items) => match items.node.as_str() {
- "http://jabber.org/protocol/nick" => match items.items {
- ItemsType::Item(items) => {
- if let Some(item) = items.first() {
- match &item.item {
- Some(c) => match c {
- Content::Nick(nick) => {
- if let Err(e) = logic
- .db()
- .upsert_user_nick(from.as_bare(), nick.0.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(
- MessageRecvError::NickUpdate(e),
- ))
- .await;
+ Event::Items(items) => {
+ match items.node.as_str() {
+ "http://jabber.org/protocol/nick" => match items.items {
+ ItemsType::Item(items) => {
+ if let Some(item) = items.first() {
+ match &item.item {
+ Some(c) => match c {
+ Content::Nick(nick) => {
+ let nick = nick.0.clone();
+ if nick.is_empty() {
+ match logic
+ .db()
+ .delete_user_nick(from.as_bare())
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::NickUpdate(e),
+ ))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ } else {
+ match logic
+ .db()
+ .upsert_user_nick(
+ from.as_bare(),
+ nick.clone(),
+ )
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::NickUpdate(e),
+ ))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ }
}
+ _ => {}
+ },
+ None => {}
+ }
+ }
+ }
+ _ => {}
+ },
+ "urn:xmpp:avatar:metadata" => {
+ match items.items {
+ ItemsType::Item(items) => {
+ if let Some(item) = items.first() {
+ match &item.item {
+ Some(Content::AvatarMetadata(metadata)) => {
+ // check if user avatar has been deleted
+ if let Some(metadata) = metadata
+ .info
+ .iter()
+ .find(|info| info.url.is_none())
+ {
+ // check if user avatar has changed
+ match logic
+ .db()
+ .upsert_user_avatar(
+ from.as_bare(),
+ metadata.id.clone(),
+ )
+ .await
+ {
+ Ok((changed, old_avatar)) => {
+ if changed {
+ if let Some(old_avatar) = old_avatar
+ {
+ if let Err(e) = logic
+ .file_store()
+ .delete(&old_avatar)
+ .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
+ logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
+ }
+ }
- logic
- .update_sender()
- .send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
- nick: nick.0.clone(),
- })
- .await;
+ match logic
+ .file_store()
+ .is_stored(&metadata.id)
+ .await
+ .map_err(|err| {
+ AvatarUpdateError::<Fs>::FileStore(
+ err,
+ )
+ }) {
+ Ok(false) => {
+ // get data
+ let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?;
+ match pep_item {
+ crate::pep::Item::AvatarData(data) => {
+ let data = data.map(|data| data.data_b64).unwrap_or_default();
+ // TODO: these should all be in a separate avatarupdate function
+ match BASE64_STANDARD.decode(data) {
+ Ok(data) => {
+ let mut hasher = Sha1::new();
+ hasher.update(&data);
+ let received_data_hash = BASE64_STANDARD.encode(hasher.finalize());
+ if received_data_hash == metadata.id {
+ if let Err(e) = logic.file_store().store(&received_data_hash, &data).await {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await;
+ }
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: Some(
+ metadata.id.clone(),
+ ),
+ },
+ )
+ .await;
+ }
+ },
+ Err(e) => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await;
+ },
+ }
+ },
+ _ => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await;
+ }
+ }
+ }
+ Ok(true) => {
+ // just send the update
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: Some(
+ metadata.id.clone(),
+ ),
+ },
+ )
+ .await;
+ }
+ Err(e) => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await;
+ }
+ }
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::AvatarUpdate(
+ AvatarUpdateError::Database(
+ e,
+ ),
+ ),
+ ))
+ .await;
+ }
+ }
+ } else {
+ // delete avatar
+ match logic
+ .db()
+ .delete_user_avatar(from.as_bare())
+ .await
+ {
+ Ok((changed, old_avatar)) => {
+ if changed {
+ if let Some(old_avatar) = old_avatar
+ {
+ if let Err(e) = logic
+ .file_store()
+ .delete(&old_avatar)
+ .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
+ logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
+ }
+ }
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: None,
+ },
+ )
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::AvatarUpdate(
+ AvatarUpdateError::Database(
+ e,
+ ),
+ ),
+ ))
+ .await;
+ }
+ }
+ }
+ // check if the new file is in the file store
+ // if not, retrieve from server and save in the file store (remember to check if the hash matches)
+ // send the avatar update
+ }
+ _ => {}
}
- Content::Unknown(element) => {}
- },
- None => {}
+ }
}
+ _ => {}
}
}
- ItemsType::Retract(retracts) => {}
- },
- _ => {}
- },
+ _ => {}
+ }
+ }
// Event::Collection(collection) => todo!(),
// Event::Configuration(configuration) => todo!(),
// Event::Delete(delete) => todo!(),
// Event::Purge(purge) => todo!(),
// Event::Subscription(subscription) => todo!(),
- _ => {}
+ _ => {} // TODO: catch these catch-alls in some way
}
}
@@ -240,8 +526,8 @@ pub async fn recv_presence(
}
}
-pub async fn recv_iq(
- logic: ClientLogic,
+pub async fn recv_iq<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
connection: Connected,
iq: Iq,
) -> Result<Option<UpdateMessage>, IqError> {
@@ -469,11 +755,11 @@ pub async fn recv_iq(
}
}
-pub async fn process_stanza(
- logic: ClientLogic,
+pub async fn process_stanza<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
stanza: Stanza,
connection: Connected,
-) -> Result<Option<UpdateMessage>, Error> {
+) -> Result<Option<UpdateMessage>, Error<Fs>> {
let update = match stanza {
Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
Stanza::Presence(presence) => Ok(recv_presence(presence).await?),