diff options
author | 2025-04-08 10:38:18 +0100 | |
---|---|---|
committer | 2025-04-08 10:38:18 +0100 | |
commit | 5b644e2dc8712d56931b410b9c46dae1ef36e691 (patch) | |
tree | 2290b3ab2a5829c3b8406ce073a95474e146a680 /filamento/src/logic | |
parent | c24541b129a14a598afe00ed4244d49d27513310 (diff) | |
download | luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.tar.gz luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.tar.bz2 luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.zip |
feat(filamento): user avatar publishing and processing
Diffstat (limited to '')
-rw-r--r-- | filamento/src/logic/abort.rs | 4 | ||||
-rw-r--r-- | filamento/src/logic/connect.rs | 6 | ||||
-rw-r--r-- | filamento/src/logic/connection_error.rs | 7 | ||||
-rw-r--r-- | filamento/src/logic/disconnect.rs | 7 | ||||
-rw-r--r-- | filamento/src/logic/local_only.rs | 37 | ||||
-rw-r--r-- | filamento/src/logic/mod.rs | 27 | ||||
-rw-r--r-- | filamento/src/logic/offline.rs | 45 | ||||
-rw-r--r-- | filamento/src/logic/online.rs | 362 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 422 |
9 files changed, 762 insertions, 155 deletions
diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs index df82655..3588b13 100644 --- a/filamento/src/logic/abort.rs +++ b/filamento/src/logic/abort.rs @@ -1,7 +1,9 @@ use lampada::error::ReadError; +use crate::files::FileStore; + use super::ClientLogic; -pub async fn on_abort(logic: ClientLogic) { +pub async fn on_abort<Fs: FileStore + Clone>(logic: ClientLogic<Fs>) { logic.pending().drain().await; } diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs index dc05448..37cdad5 100644 --- a/filamento/src/logic/connect.rs +++ b/filamento/src/logic/connect.rs @@ -5,12 +5,16 @@ use tracing::debug; use crate::{ Command, UpdateMessage, error::{ConnectionJobError, Error, RosterError}, + files::FileStore, presence::{Online, PresenceType}, }; use super::ClientLogic; -pub async fn handle_connect(logic: ClientLogic, connection: Connected) { +pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>( + logic: ClientLogic<Fs>, + connection: Connected, +) { let (send, recv) = oneshot::channel(); debug!("getting roster"); logic diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs index 081900b..36c1cef 100644 --- a/filamento/src/logic/connection_error.rs +++ b/filamento/src/logic/connection_error.rs @@ -1,7 +1,12 @@ use lampada::error::ConnectionError; +use crate::files::FileStore; + use super::ClientLogic; -pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { +pub async fn handle_connection_error<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + error: ConnectionError, +) { logic.handle_error(error.into()).await; } diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs index 241c3e6..ebcfd4f 100644 --- a/filamento/src/logic/disconnect.rs +++ b/filamento/src/logic/disconnect.rs @@ -1,11 +1,14 @@ use lampada::Connected; use stanza::client::Stanza; -use crate::{UpdateMessage, presence::Offline}; +use crate::{UpdateMessage, files::FileStore, presence::Offline}; use super::ClientLogic; -pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) { +pub async fn handle_disconnect<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + connection: Connected, +) { // TODO: be able to set offline status message let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None); let stanza = Stanza::Presence(offline_presence); diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs index 3f6fe8d..cabbef4 100644 --- a/filamento/src/logic/local_only.rs +++ b/filamento/src/logic/local_only.rs @@ -4,44 +4,61 @@ use uuid::Uuid; use crate::{ chat::{Chat, Message}, error::DatabaseError, + files::FileStore, user::User, }; use super::ClientLogic; -pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> { Ok(logic.db().read_chats().await?) } -pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats_ordered<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> { Ok(logic.db().read_chats_ordered().await?) } -pub async fn handle_get_chats_ordered_with_latest_messages( - logic: &ClientLogic, +pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, ) -> Result<Vec<(Chat, Message)>, DatabaseError> { Ok(logic.db().read_chats_ordered_with_latest_messages().await?) } -pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> { +pub async fn handle_get_chat<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<Chat, DatabaseError> { Ok(logic.db().read_chat(jid).await?) } -pub async fn handle_get_messages( - logic: &ClientLogic, +pub async fn handle_get_messages<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, jid: JID, ) -> Result<Vec<Message>, DatabaseError> { Ok(logic.db().read_message_history(jid).await?) } -pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> { +pub async fn handle_delete_chat<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<(), DatabaseError> { Ok(logic.db().delete_chat(jid).await?) } -pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> { +pub async fn handle_delete_messaage<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + uuid: Uuid, +) -> Result<(), DatabaseError> { Ok(logic.db().delete_message(uuid).await?) } -pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> { +pub async fn handle_get_user<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<User, DatabaseError> { Ok(logic.db().read_user(jid).await?) } diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 1ddd7d3..5e05dac 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -4,12 +4,13 @@ use jid::JID; use lampada::{Connected, Logic, error::ReadError}; use stanza::client::Stanza; use tokio::sync::{Mutex, mpsc, oneshot}; -use tracing::{error, info, warn}; +use tracing::{error, info}; use crate::{ Client, Command, UpdateMessage, db::Db, error::{Error, IqRequestError, ResponseError}, + files::FileStore, }; mod abort; @@ -22,12 +23,13 @@ mod online; mod process_stanza; #[derive(Clone)] -pub struct ClientLogic { - client: Client, +pub struct ClientLogic<Fs: FileStore> { + client: Client<Fs>, bare_jid: JID, db: Db, pending: Pending, update_sender: mpsc::Sender<UpdateMessage>, + file_store: Fs, } #[derive(Clone)] @@ -75,12 +77,13 @@ impl Pending { } } -impl ClientLogic { +impl<Fs: FileStore> ClientLogic<Fs> { pub fn new( - client: Client, + client: Client<Fs>, bare_jid: JID, db: Db, update_sender: mpsc::Sender<UpdateMessage>, + file_store: Fs, ) -> Self { Self { db, @@ -88,10 +91,11 @@ impl ClientLogic { update_sender, client, bare_jid, + file_store, } } - pub fn client(&self) -> &Client { + pub fn client(&self) -> &Client<Fs> { &self.client } @@ -103,6 +107,10 @@ impl ClientLogic { &self.pending } + pub fn file_store(&self) -> &Fs { + &self.file_store + } + pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { &self.update_sender } @@ -113,13 +121,14 @@ impl ClientLogic { self.update_sender().send(update).await; } - pub async fn handle_error(&self, e: Error) { + // TODO: delete this + pub async fn handle_error(&self, e: Error<Fs>) { error!("{}", e); } } -impl Logic for ClientLogic { - type Cmd = Command; +impl<Fs: FileStore + Clone + Send + Sync> Logic for ClientLogic<Fs> { + type Cmd = Command<Fs>; // pub async fn handle_stream_error(self, error) {} // stanza errors (recoverable) diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index 6399cf7..566972c 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -1,3 +1,5 @@ +use std::process::id; + use chrono::Utc; use lampada::error::WriteError; use uuid::Uuid; @@ -6,9 +8,10 @@ use crate::{ Command, chat::{Delivery, Message}, error::{ - DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError, - StatusError, + AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, + NickError, PEPError, RosterError, StatusError, }, + files::FileStore, presence::Online, roster::Contact, }; @@ -22,7 +25,7 @@ use super::{ }, }; -pub async fn handle_offline(logic: ClientLogic, command: Command) { +pub async fn handle_offline<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>) { let result = handle_offline_result(&logic, command).await; match result { Ok(_) => {} @@ -30,16 +33,24 @@ pub async fn handle_offline(logic: ClientLogic, command: Command) { } } -pub async fn handle_set_status(logic: &ClientLogic, online: Online) -> Result<(), StatusError> { +pub async fn handle_set_status<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + online: Online, +) -> Result<(), StatusError> { logic.db().upsert_cached_status(online).await?; Ok(()) } -pub async fn handle_get_roster(logic: &ClientLogic) -> Result<Vec<Contact>, RosterError> { +pub async fn handle_get_roster<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Contact>, RosterError> { Ok(logic.db().read_cached_roster().await?) } -pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Result<(), Error> { +pub async fn handle_offline_result<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + command: Command<Fs>, +) -> Result<(), Error<Fs>> { match command { Command::GetRoster(sender) => { let roster = handle_get_roster(logic).await; @@ -77,7 +88,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res let user = handle_get_user(logic, jid).await; sender.send(user); } - // TODO: offline queue to modify roster Command::AddContact(_jid, sender) => { sender.send(Err(RosterError::Write(WriteError::Disconnected))); } @@ -112,7 +122,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res let result = handle_set_status(logic, online).await; sender.send(result); } - // TODO: offline message queue Command::SendMessage(jid, body) => { let id = Uuid::new_v4(); let timestamp = Utc::now(); @@ -159,7 +168,7 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res Command::DiscoItems(_jid, _node, sender) => { sender.send(Err(DiscoError::Write(WriteError::Disconnected))); } - Command::Publish { + Command::PublishPEPItem { item: _, node: _, sender, @@ -169,6 +178,24 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res Command::ChangeNick(_, sender) => { sender.send(Err(NickError::Disconnected)); } + Command::ChangeAvatar(_items, sender) => { + sender.send(Err(AvatarPublishError::Disconnected)); + } + Command::DeletePEPNode { node: _, sender } => { + sender.send(Err(PEPError::IqResponse(IqRequestError::Write( + WriteError::Disconnected, + )))); + } + Command::GetPEPItem { + node: _, + sender, + jid: _, + id: _, + } => { + sender.send(Err(PEPError::IqResponse(IqRequestError::Write( + WriteError::Disconnected, + )))); + } } Ok(()) } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index b069f59..745adc1 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,23 +1,24 @@ +use std::io::Cursor; + +use base64::{prelude::BASE64_STANDARD, Engine}; use chrono::Utc; +use image::ImageReader; use jid::JID; use lampada::{Connected, WriteMessage, error::WriteError}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ iq::{self, Iq, IqType, Query}, Stanza - }, - xep_0030::{info, items}, - xep_0060::pubsub::{self, Pubsub}, - xep_0172::{self, Nick}, - xep_0203::Delay, + }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay }; use tokio::sync::oneshot; use tracing::{debug, error, info}; use uuid::Uuid; use crate::{ - chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ - DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError - }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage + avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ + AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError + }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage }; use super::{ @@ -29,7 +30,7 @@ use super::{ }, }; -pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { +pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) { let result = handle_online_result(&logic, command, connection).await; match result { Ok(_) => {} @@ -37,8 +38,8 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con } } -pub async fn handle_get_roster( - logic: &ClientLogic, +pub async fn handle_get_roster<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, ) -> Result<Vec<Contact>, RosterError> { let iq_id = Uuid::new_v4().to_string(); @@ -96,8 +97,8 @@ pub async fn handle_get_roster( } } -pub async fn handle_add_contact( - logic: &ClientLogic, +pub async fn handle_add_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), RosterError> { @@ -154,8 +155,8 @@ pub async fn handle_add_contact( } } -pub async fn handle_buddy_request( - logic: &ClientLogic, +pub async fn handle_buddy_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -177,8 +178,8 @@ pub async fn handle_buddy_request( Ok(()) } -pub async fn handle_subscription_request( - logic: &ClientLogic, +pub async fn handle_subscription_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -195,8 +196,8 @@ pub async fn handle_subscription_request( Ok(()) } -pub async fn handle_accept_buddy_request( - logic: &ClientLogic, +pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -218,8 +219,8 @@ pub async fn handle_accept_buddy_request( Ok(()) } -pub async fn handle_accept_subscription_request( - logic: &ClientLogic, +pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -274,8 +275,8 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result< Ok(()) } -pub async fn handle_delete_contact( - logic: &ClientLogic, +pub async fn handle_delete_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), RosterError> { @@ -333,8 +334,8 @@ pub async fn handle_delete_contact( } } -pub async fn handle_update_contact( - logic: &ClientLogic, +pub async fn handle_update_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, contact_update: ContactUpdate, @@ -398,8 +399,8 @@ pub async fn handle_update_contact( } } -pub async fn handle_set_status( - logic: &ClientLogic, +pub async fn handle_set_status<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, online: Online, ) -> Result<(), StatusError> { @@ -411,9 +412,18 @@ pub async fn handle_set_status( Ok(()) } -pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { +pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) { // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. - let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + let have_chatted = match logic.db().upsert_chat_and_user(&jid).await { + Ok(have_chatted) => { + have_chatted + }, + Err(e) => { + error!("{}", e); + false + }, + }; let nick; let mark_chat_as_chatted; @@ -506,6 +516,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid }) .await; if mark_chat_as_chatted { + debug!("marking chat as chatted"); if let Err(e) = logic.db.mark_chat_as_chatted(jid).await { logic .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) @@ -540,8 +551,8 @@ pub async fn handle_send_presence( Ok(()) } -pub async fn handle_disco_info( - logic: &ClientLogic, +pub async fn handle_disco_info<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: Option<String>, @@ -611,8 +622,8 @@ pub async fn handle_disco_info( } } -pub async fn handle_disco_items( - logic: &ClientLogic, +pub async fn handle_disco_items<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: Option<String>, @@ -680,20 +691,60 @@ pub async fn handle_disco_items( } } -pub async fn handle_publish( - logic: &ClientLogic, +pub async fn handle_publish_pep_item<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, item: pep::Item, node: String, -) -> Result<(), PublishError> { +) -> Result<(), PEPError> { let id = Uuid::new_v4().to_string(); let publish = match item { - pep::Item::Nick(n) => pubsub::Publish { - node, - items: vec![pubsub::Item { - item: Some(pubsub::Content::Nick(Nick(n))), - ..Default::default() - }], + pep::Item::Nick(n) => { + if let Some(n) = n { + pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick(n))), + ..Default::default() + }], + } + } else { + pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick("".to_string()))), + ..Default::default() + }] + } + } + }, + pep::Item::AvatarMetadata(metadata) => { + if let Some(metadata) = metadata { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })), + id: Some(metadata.hash), + ..Default::default() + }]} + } else { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })), + ..Default::default() + }]} + } + }, + pep::Item::AvatarData(data) => { + if let Some(data) = data { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))), + id: Some(data.hash), + ..Default::default() + }] } + } else { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))), + ..Default::default() + }]} + } }, }; let request = Iq { @@ -726,38 +777,229 @@ pub async fn handle_publish( if let Some(query) = query { match query { Query::Pubsub(_) => Ok(()), - q => Err(PublishError::MismatchedQuery(q)), + q => Err(PEPError::MismatchedQuery(q)), } } else { - Err(PublishError::MissingQuery) + Err(PEPError::MissingQuery) } } IqType::Error => { - Err(PublishError::StanzaErrors(errors)) + Err(PEPError::StanzaErrors(errors)) } _ => unreachable!(), } } else { - Err(PublishError::IncorrectEntity( + Err(PEPError::IncorrectEntity( from.unwrap_or_else(|| connection.jid().as_bare()), )) } } - s => Err(PublishError::UnexpectedStanza(s)), + s => Err(PEPError::UnexpectedStanza(s)), } } -pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { +pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> { + let stanza_id = Uuid::new_v4().to_string(); + let request = Iq { + from: Some(connection.jid().clone()), + id: stanza_id.clone(), + to: jid.clone(), + r#type: IqType::Get, + lang: None, + query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items { + max_items: None, + node, + subid: None, + items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }], + }))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), stanza_id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == jid || { + if jid == None { + from == Some(connection.jid().as_bare()) + } else { + false + } + } { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::Pubsub(Pubsub::Items(mut items)) => { + if let Some(item) = items.items.pop() { + if item.id == Some(id.clone()) { + match item.item.ok_or(PEPError::MissingItem)? { + pubsub::Content::Nick(nick) => { + if nick.0.is_empty() { + Ok(pep::Item::Nick(None)) + } else { + Ok(pep::Item::Nick(Some(nick.0))) + + } + }, + pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))), + pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))), + pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem), + } + } else { + Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string()))) + } + } else { + Err(PEPError::MissingItem) + } + }, + q => Err(PEPError::MismatchedQuery(q)), + } + } else { + Err(PEPError::MissingQuery) + } + } + IqType::Error => { + Err(PEPError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + // TODO: include expected entity + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().as_bare()), + )) + } + } + s => Err(PEPError::UnexpectedStanza(s)), + } +} + +pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> { logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?; Ok(()) } +pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> { + match img_data { + // set avatar + Some(data) => { + // load the image data and guess the format + let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?; + + // convert the image to png; + let mut data_png = Vec::new(); + image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Png)?; + + // calculate the length of the data in bytes. + let bytes = data_png.len().try_into()?; + + // calculate sha1 hash of the data + let mut sha1 = Sha1::new(); + sha1.update(&data_png); + let sha1_result = sha1.finalize(); + let hash = BASE64_STANDARD.encode(sha1_result); + + // encode the image data as base64 + let data_b64 = BASE64_STANDARD.encode(data_png.clone()); + + // publish the data to the data node + logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?; + + // publish the metadata to the metadata node + logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/png".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?; + + // if everything went well, save the data to the disk. + + if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? { + logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))? + } + // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval. + // TODO: should the node be purged? + + Ok(()) + }, + // remove avatar + None => { + logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?; + logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?; + Ok(()) + }, + } +} + +pub async fn handle_delete_pep_node<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + connection: Connected, + node: String, +) -> Result<(), PEPError> { + let id = Uuid::new_v4().to_string(); + let request = Iq { + from: Some(connection.jid().clone()), + id: id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == None || + from == Some(connection.jid().as_bare()) + { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::PubsubOwner(_) => Ok(()), + q => Err(PEPError::MismatchedQuery(q)), + } + } else { + // Err(PEPError::MissingQuery) + Ok(()) + } + } + IqType::Error => { + Err(PEPError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().as_bare()), + )) + } + } + s => Err(PEPError::UnexpectedStanza(s)), + } +} + // TODO: could probably macro-ise? -pub async fn handle_online_result( - logic: &ClientLogic, - command: Command, +pub async fn handle_online_result<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + command: Command<Fs>, connection: Connected, -) -> Result<(), Error> { +) -> Result<(), Error<Fs>> { match command { Command::GetRoster(result_sender) => { let roster = handle_get_roster(logic, connection).await; @@ -854,14 +1096,26 @@ pub async fn handle_online_result( let result = handle_disco_items(logic, connection, jid, node).await; let _ = sender.send(result); } - Command::Publish { item, node, sender } => { - let result = handle_publish(logic, connection, item, node).await; + Command::PublishPEPItem { item, node, sender } => { + let result = handle_publish_pep_item(logic, connection, item, node).await; let _ = sender.send(result); } Command::ChangeNick(nick, sender) => { let result = handle_change_nick(logic, nick).await; let _ = sender.send(result); } + Command::ChangeAvatar(img_data, sender) => { + let result = handle_change_avatar(logic, img_data).await; + let _ = sender.send(result); + }, + Command::DeletePEPNode { node, sender } => { + let result = handle_delete_pep_node(logic, connection, node).await; + let _ = sender.send(result); + }, + Command::GetPEPItem { node, sender, jid, id } => { + let result = handle_get_pep_item(logic, connection, jid, node, id).await; + let _ = sender.send(result); + }, } Ok(()) } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 11d7588..c383d70 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -1,7 +1,9 @@ use std::str::FromStr; +use base64::{Engine, prelude::BASE64_STANDARD}; use chrono::Utc; use lampada::{Connected, SupervisorSender}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ Stanza, @@ -17,14 +19,22 @@ use uuid::Uuid; use crate::{ UpdateMessage, caps, chat::{Body, Message}, - error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError}, + error::{ + AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, + RosterError, + }, + files::FileStore, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, }; use super::ClientLogic; -pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +pub async fn handle_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + stanza: Stanza, + connection: Connected, +) { let result = process_stanza(logic.clone(), stanza, connection).await; match result { Ok(u) => match u { @@ -38,10 +48,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne } } -pub async fn recv_message( - logic: ClientLogic, +pub async fn recv_message<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza_message: stanza::client::message::Message, -) -> Result<Option<UpdateMessage>, MessageRecvError> { +) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> { if let Some(from) = stanza_message.from { // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. let timestamp = stanza_message @@ -50,6 +60,7 @@ pub async fn recv_message( .unwrap_or_else(|| Utc::now()); // TODO: group chat messages + // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state. // if there is a body, should create chat message if let Some(body) = stanza_message.body { let message = Message { @@ -67,16 +78,28 @@ pub async fn recv_message( }; // save the message to the database - logic.db().upsert_chat_and_user(&from).await?; - if let Err(e) = logic - .db() - .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) - .await; - } + match logic.db().upsert_chat_and_user(&from).await { + Ok(_) => { + if let Err(e) = logic + .db() + .create_message_with_user_resource( + message.clone(), + from.clone(), + from.clone(), + ) + .await + { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + } + }; // update the client with the new message logic @@ -89,70 +112,333 @@ pub async fn recv_message( } if let Some(nick) = stanza_message.nick { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) - .await; + let nick = nick.0; + if nick.is_empty() { + match logic.db().delete_user_nick(from.as_bare()).await { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick(from.as_bare(), nick.clone()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } } - - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0, - }) - .await; } if let Some(event) = stanza_message.event { match event { - Event::Items(items) => match items.node.as_str() { - "http://jabber.org/protocol/nick" => match items.items { - ItemsType::Item(items) => { - if let Some(item) = items.first() { - match &item.item { - Some(c) => match c { - Content::Nick(nick) => { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv( - MessageRecvError::NickUpdate(e), - )) - .await; + Event::Items(items) => { + match items.node.as_str() { + "http://jabber.org/protocol/nick" => match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + match &item.item { + Some(c) => match c { + Content::Nick(nick) => { + let nick = nick.0.clone(); + if nick.is_empty() { + match logic + .db() + .delete_user_nick(from.as_bare()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick( + from.as_bare(), + nick.clone(), + ) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + } } + _ => {} + }, + None => {} + } + } + } + _ => {} + }, + "urn:xmpp:avatar:metadata" => { + match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + match &item.item { + Some(Content::AvatarMetadata(metadata)) => { + // check if user avatar has been deleted + if let Some(metadata) = metadata + .info + .iter() + .find(|info| info.url.is_none()) + { + // check if user avatar has changed + match logic + .db() + .upsert_user_avatar( + from.as_bare(), + metadata.id.clone(), + ) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0.clone(), - }) - .await; + match logic + .file_store() + .is_stored(&metadata.id) + .await + .map_err(|err| { + AvatarUpdateError::<Fs>::FileStore( + err, + ) + }) { + Ok(false) => { + // get data + let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?; + match pep_item { + crate::pep::Item::AvatarData(data) => { + let data = data.map(|data| data.data_b64).unwrap_or_default(); + // TODO: these should all be in a separate avatarupdate function + match BASE64_STANDARD.decode(data) { + Ok(data) => { + let mut hasher = Sha1::new(); + hasher.update(&data); + let received_data_hash = BASE64_STANDARD.encode(hasher.finalize()); + if received_data_hash == metadata.id { + if let Err(e) = logic.file_store().store(&received_data_hash, &data).await { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await; + } + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + }, + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await; + }, + } + }, + _ => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await; + } + } + } + Ok(true) => { + // just send the update + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await; + } + } + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } else { + // delete avatar + match logic + .db() + .delete_user_avatar(from.as_bare()) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: None, + }, + ) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } + // check if the new file is in the file store + // if not, retrieve from server and save in the file store (remember to check if the hash matches) + // send the avatar update + } + _ => {} } - Content::Unknown(element) => {} - }, - None => {} + } } + _ => {} } } - ItemsType::Retract(retracts) => {} - }, - _ => {} - }, + _ => {} + } + } // Event::Collection(collection) => todo!(), // Event::Configuration(configuration) => todo!(), // Event::Delete(delete) => todo!(), // Event::Purge(purge) => todo!(), // Event::Subscription(subscription) => todo!(), - _ => {} + _ => {} // TODO: catch these catch-alls in some way } } @@ -240,8 +526,8 @@ pub async fn recv_presence( } } -pub async fn recv_iq( - logic: ClientLogic, +pub async fn recv_iq<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, connection: Connected, iq: Iq, ) -> Result<Option<UpdateMessage>, IqError> { @@ -469,11 +755,11 @@ pub async fn recv_iq( } } -pub async fn process_stanza( - logic: ClientLogic, +pub async fn process_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza: Stanza, connection: Connected, -) -> Result<Option<UpdateMessage>, Error> { +) -> Result<Option<UpdateMessage>, Error<Fs>> { let update = match stanza { Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), Stanza::Presence(presence) => Ok(recv_presence(presence).await?), |