diff options
Diffstat (limited to 'filamento/src/logic/online.rs')
-rw-r--r-- | filamento/src/logic/online.rs | 565 |
1 files changed, 464 insertions, 101 deletions
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index b069f59..b36f9a9 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,35 +1,35 @@ +use std::{io::Cursor, time::Duration}; + +use base64::{prelude::BASE64_STANDARD, Engine}; use chrono::Utc; -use jid::JID; +use image::ImageReader; +use jid::{BareJID, JID}; use lampada::{Connected, WriteMessage, error::WriteError}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ iq::{self, Iq, IqType, Query}, Stanza - }, - xep_0030::{info, items}, - xep_0060::pubsub::{self, Pubsub}, - xep_0172::{self, Nick}, - xep_0203::Delay, + }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay }; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::spawn_blocking}; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; use tracing::{debug, error, info}; use uuid::Uuid; use crate::{ - chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ - DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError - }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage + avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ + AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError + }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage }; use super::{ - ClientLogic, local_only::{ - handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, - handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, - handle_get_messages, handle_get_user, - }, + handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chat_and_user, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_message, handle_get_messages, handle_get_messages_with_users, handle_get_user + }, ClientLogic }; -pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { +pub async fn handle_online<Fs: FileStore + Clone + 'static>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) { let result = handle_online_result(&logic, command, connection).await; match result { Ok(_) => {} @@ -37,13 +37,13 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con } } -pub async fn handle_get_roster( - logic: &ClientLogic, +pub async fn handle_get_roster<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, ) -> Result<Vec<Contact>, RosterError> { let iq_id = Uuid::new_v4().to_string(); let stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), + from: Some(connection.jid().clone().into()), id: iq_id.to_string(), to: None, r#type: IqType::Get, @@ -96,14 +96,79 @@ pub async fn handle_get_roster( } } -pub async fn handle_add_contact( - logic: &ClientLogic, +// this can't query the client... otherwise there is a hold-up and the connection can't complete +pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + connection: Connected, +) -> Result<Vec<(Contact, User)>, RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone().into()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, stanza, iq_id.clone()) + .await?; + // TODO: timeout + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { + logic + .handle_error(Error::Roster(RosterError::Cache(e.into()))) + .await; + }; + let mut users = Vec::new(); + for contact in &contacts { + let user = logic.db().read_user(contact.user_jid.clone()).await?; + users.push(user); + } + Ok(contacts.into_iter().zip(users).collect()) + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_add_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, - jid: JID, + jid: BareJID, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), + from: Some(connection.jid().clone().into()), id: iq_id.clone(), to: None, r#type: IqType::Set, @@ -154,12 +219,13 @@ pub async fn handle_add_contact( } } -pub async fn handle_buddy_request( - logic: &ClientLogic, +pub async fn handle_buddy_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, - jid: JID, + jid: BareJID, ) -> Result<(), SubscribeError> { - let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; + let jid: JID = jid.into(); + let client_user = logic.db.read_user(logic.jid.clone()).await?; let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid.clone()), @@ -177,16 +243,16 @@ pub async fn handle_buddy_request( Ok(()) } -pub async fn handle_subscription_request( - logic: &ClientLogic, +pub async fn handle_subscription_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, - jid: JID, + jid: BareJID, ) -> Result<(), SubscribeError> { // TODO: i should probably have builders - let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; + let client_user = logic.db.read_user(logic.jid.clone()).await?; let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { - to: Some(jid), + to: Some(jid.into()), r#type: Some(stanza::client::presence::PresenceType::Subscribe), nick, ..Default::default() @@ -195,18 +261,19 @@ pub async fn handle_subscription_request( Ok(()) } -pub async fn handle_accept_buddy_request( - logic: &ClientLogic, +pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, - jid: JID, + jid: BareJID, ) -> Result<(), SubscribeError> { + let jid: JID = jid.into(); let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribed), ..Default::default() }); connection.write_handle().write(presence).await?; - let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; + let client_user = logic.db.read_user(logic.jid.clone()).await?; let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid), @@ -218,17 +285,14 @@ pub async fn handle_accept_buddy_request( Ok(()) } -pub async fn handle_accept_subscription_request( - logic: &ClientLogic, +pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, - jid: JID, + jid: BareJID, ) -> Result<(), SubscribeError> { - let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; - let nick = client_user.nick.map(|nick| Nick(nick)); let presence = Stanza::Presence(stanza::client::presence::Presence { - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - nick, + to: Some(jid.into()), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), ..Default::default() }); connection.write_handle().write(presence).await?; @@ -237,10 +301,10 @@ pub async fn handle_accept_subscription_request( pub async fn handle_unsubscribe_from_contact( connection: Connected, - jid: JID, + jid: BareJID, ) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { - to: Some(jid), + to: Some(jid.into()), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), ..Default::default() }); @@ -248,9 +312,9 @@ pub async fn handle_unsubscribe_from_contact( Ok(()) } -pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { +pub async fn handle_unsubscribe_contact(connection: Connected, jid: BareJID) -> Result<(), WriteError> { let presence = Stanza::Presence(stanza::client::presence::Presence { - to: Some(jid), + to: Some(jid.into()), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), ..Default::default() }); @@ -258,7 +322,8 @@ pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Resu Ok(()) } -pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { +pub async fn handle_unfriend_contact(connection: Connected, jid: BareJID) -> Result<(), WriteError> { + let jid: JID = jid.into(); let presence = Stanza::Presence(stanza::client::presence::Presence { to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), @@ -274,14 +339,14 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result< Ok(()) } -pub async fn handle_delete_contact( - logic: &ClientLogic, +pub async fn handle_delete_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, - jid: JID, + jid: BareJID, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), + from: Some(connection.jid().clone().into()), id: iq_id.clone(), to: None, r#type: IqType::Set, @@ -333,10 +398,10 @@ pub async fn handle_delete_contact( } } -pub async fn handle_update_contact( - logic: &ClientLogic, +pub async fn handle_update_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, - jid: JID, + jid: BareJID, contact_update: ContactUpdate, ) -> Result<(), RosterError> { let iq_id = Uuid::new_v4().to_string(); @@ -347,7 +412,8 @@ pub async fn handle_update_contact( .map(|group| stanza::roster::Group(Some(group))), ); let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), + // TODO: these clones could technically be avoided? + from: Some(connection.jid().clone().into()), id: iq_id.clone(), to: None, r#type: IqType::Set, @@ -398,8 +464,8 @@ pub async fn handle_update_contact( } } -pub async fn handle_set_status( - logic: &ClientLogic, +pub async fn handle_set_status<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, online: Online, ) -> Result<(), StatusError> { @@ -411,14 +477,23 @@ pub async fn handle_set_status( Ok(()) } -pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { +pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: BareJID, body: Body) { // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. - let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + let have_chatted = match logic.db().upsert_chat_and_user(jid.clone()).await { + Ok(have_chatted) => { + have_chatted + }, + Err(e) => { + error!("{}", e); + false + }, + }; let nick; let mark_chat_as_chatted; if have_chatted == false { - match logic.db.read_user(logic.bare_jid.clone()).await { + match logic.db.read_user(logic.jid.clone()).await { Ok(u) => { nick = u.nick.map(|nick| Nick(nick)); mark_chat_as_chatted = true; @@ -441,7 +516,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid let timestamp = Utc::now(); let message = Message { id, - from: connection.jid().as_bare(), + from: connection.jid().to_bare(), body: body.clone(), timestamp, delivery: Some(Delivery::Sending), @@ -451,7 +526,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid // TODO: mark these as potentially failed upon client launch if let Err(e) = logic .db() - .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone()) + .create_message_with_user_resource(message.clone(), jid.clone(), connection.jid().clone()) .await { // TODO: should these really be handle_error or just the error macro? @@ -460,20 +535,33 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid .await; } + let from = match logic.db().read_user(logic.jid.clone()).await { + Ok(u) => u, + Err(e) => { + error!("{}", e); + User { + jid: logic.jid.clone(), + nick: None, + avatar: None, + } + }, + }; + // tell the client a message is being sent logic .update_sender() .send(UpdateMessage::Message { - to: jid.as_bare(), + to: jid.clone(), message, + from, }) .await; // prepare the message stanza let message_stanza = Stanza::Message(stanza::client::message::Message { - from: Some(connection.jid().clone()), + from: Some(connection.jid().clone().into()), id: Some(id.to_string()), - to: Some(jid.clone()), + to: Some(jid.clone().into()), // TODO: specify message type r#type: stanza::client::message::MessageType::Chat, // TODO: lang ? @@ -498,14 +586,19 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid match result { Ok(_) => { info!("sent message: {:?}", message_stanza); + if let Err(e) = logic.db().update_message_delivery(id, Delivery::Written).await { + error!("updating message delivery: {}", e); + } logic .update_sender() .send(UpdateMessage::MessageDelivery { id, delivery: Delivery::Written, + chat: jid.clone(), }) .await; if mark_chat_as_chatted { + debug!("marking chat as chatted"); if let Err(e) = logic.db.mark_chat_as_chatted(jid).await { logic .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) @@ -519,6 +612,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid .send(UpdateMessage::MessageDelivery { id, delivery: Delivery::Failed, + chat: jid, }) .await; logic.handle_error(MessageSendError::Write(e).into()).await; @@ -540,15 +634,15 @@ pub async fn handle_send_presence( Ok(()) } -pub async fn handle_disco_info( - logic: &ClientLogic, +pub async fn handle_disco_info<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: Option<String>, ) -> Result<Info, DiscoError> { let id = Uuid::new_v4().to_string(); let request = Iq { - from: Some(connection.jid().clone()), + from: Some(connection.jid().clone().into()), id: id.clone(), to: jid.clone(), r#type: IqType::Get, @@ -576,7 +670,7 @@ pub async fn handle_disco_info( }) if r#type == IqType::Result || r#type == IqType::Error => { if from == jid || { if jid == None { - from == Some(connection.jid().as_bare()) + from == Some(connection.jid().to_bare().into()) } else { false } @@ -603,7 +697,7 @@ pub async fn handle_disco_info( } } else { Err(DiscoError::IncorrectEntity( - from.unwrap_or_else(|| connection.jid().as_bare()), + from.unwrap_or_else(|| connection.jid().to_bare().into()), )) } } @@ -611,15 +705,15 @@ pub async fn handle_disco_info( } } -pub async fn handle_disco_items( - logic: &ClientLogic, +pub async fn handle_disco_items<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: Option<String>, ) -> Result<Items, DiscoError> { let id = Uuid::new_v4().to_string(); let request = Iq { - from: Some(connection.jid().clone()), + from: Some(connection.jid().clone().into()), id: id.clone(), to: jid.clone(), r#type: IqType::Get, @@ -645,7 +739,7 @@ pub async fn handle_disco_items( }) if r#type == IqType::Result || r#type == IqType::Error => { if from == jid || { if jid == None { - from == Some(connection.jid().as_bare()) + from == Some(connection.jid().to_bare().into()) } else { false } @@ -672,7 +766,7 @@ pub async fn handle_disco_items( } } else { Err(DiscoError::IncorrectEntity( - from.unwrap_or_else(|| connection.jid().as_bare()), + from.unwrap_or_else(|| connection.jid().to_bare().into()), )) } } @@ -680,24 +774,64 @@ pub async fn handle_disco_items( } } -pub async fn handle_publish( - logic: &ClientLogic, +pub async fn handle_publish_pep_item<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, item: pep::Item, node: String, -) -> Result<(), PublishError> { +) -> Result<(), PEPError> { let id = Uuid::new_v4().to_string(); let publish = match item { - pep::Item::Nick(n) => pubsub::Publish { - node, - items: vec![pubsub::Item { - item: Some(pubsub::Content::Nick(Nick(n))), - ..Default::default() - }], + pep::Item::Nick(n) => { + if let Some(n) = n { + pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick(n))), + ..Default::default() + }], + } + } else { + pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick("".to_string()))), + ..Default::default() + }] + } + } + }, + pep::Item::AvatarMetadata(metadata) => { + if let Some(metadata) = metadata { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })), + id: Some(metadata.hash), + ..Default::default() + }]} + } else { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })), + ..Default::default() + }]} + } + }, + pep::Item::AvatarData(data) => { + if let Some(data) = data { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))), + id: Some(data.hash), + ..Default::default() + }] } + } else { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))), + ..Default::default() + }]} + } }, }; let request = Iq { - from: Some(connection.jid().clone()), + from: Some(connection.jid().clone().into()), id: id.clone(), to: None, r#type: IqType::Set, @@ -719,50 +853,251 @@ pub async fn handle_publish( // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. }) if r#type == IqType::Result || r#type == IqType::Error => { if from == None || - from == Some(connection.jid().as_bare()) + from == Some(connection.jid().to_bare().into()) { match r#type { IqType::Result => { if let Some(query) = query { match query { Query::Pubsub(_) => Ok(()), - q => Err(PublishError::MismatchedQuery(q)), + q => Err(PEPError::MismatchedQuery(q)), + } + } else { + Err(PEPError::MissingQuery) + } + } + IqType::Error => { + Err(PEPError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().to_bare().into()), + )) + } + } + s => Err(PEPError::UnexpectedStanza(s)), + } +} + +pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<BareJID>, node: String, id: String) -> Result<pep::Item, PEPError> { + let jid = jid.map(|jid| Into::<JID>::into(jid)); + let stanza_id = Uuid::new_v4().to_string(); + let request = Iq { + from: Some(connection.jid().clone().into()), + id: stanza_id.clone(), + to: jid.clone(), + r#type: IqType::Get, + lang: None, + query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items { + max_items: None, + node, + subid: None, + items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }], + }))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), stanza_id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == jid || { + if jid == None { + from == Some(connection.jid().to_bare().into()) + } else { + false + } + } { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::Pubsub(Pubsub::Items(mut items)) => { + if let Some(item) = items.items.pop() { + if item.id == Some(id.clone()) { + match item.item.ok_or(PEPError::MissingItem)? { + pubsub::Content::Nick(nick) => { + if nick.0.is_empty() { + Ok(pep::Item::Nick(None)) + } else { + Ok(pep::Item::Nick(Some(nick.0))) + + } + }, + pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))), + pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))), + pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem), + } + } else { + Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string()))) + } + } else { + Err(PEPError::MissingItem) + } + }, + q => Err(PEPError::MismatchedQuery(q)), } } else { - Err(PublishError::MissingQuery) + Err(PEPError::MissingQuery) } } IqType::Error => { - Err(PublishError::StanzaErrors(errors)) + Err(PEPError::StanzaErrors(errors)) } _ => unreachable!(), } } else { - Err(PublishError::IncorrectEntity( - from.unwrap_or_else(|| connection.jid().as_bare()), + // TODO: include expected entity + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().to_bare().into()), )) } } - s => Err(PublishError::UnexpectedStanza(s)), + s => Err(PEPError::UnexpectedStanza(s)), } } -pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { +pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> { logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?; Ok(()) } +pub async fn handle_change_avatar<Fs: FileStore + Clone + 'static>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> { + match img_data { + // set avatar + Some(data) => { + let (bytes, hash, data_png, data_b64) = spawn_blocking(move || -> Result<_, _> { + // load the image data and guess the format + let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?; + + // convert the image to png; + let mut data_png = Vec::new(); + let image = image.resize(192, 192, image::imageops::FilterType::Nearest); + image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Jpeg)?; + + // calculate the length of the data in bytes. + let bytes = data_png.len().try_into()?; + + // calculate sha1 hash of the data + let mut sha1 = Sha1::new(); + sha1.update(&data_png); + let sha1_result = sha1.finalize(); + let hash = hex::encode(sha1_result); + + // encode the image data as base64 + let data_b64 = BASE64_STANDARD.encode(data_png.clone()); + + Ok::<(u32, String, Vec<u8>, String), AvatarPublishError<Fs>>((bytes, hash, data_png, data_b64)) + }).await.unwrap()?; + + // publish the data to the data node + logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?; + + // publish the metadata to the metadata node + logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/jpeg".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?; + + // if everything went well, save the data to the disk. + + if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? { + logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))? + } + // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval. + // TODO: should the node be purged? + + Ok(()) + }, + // remove avatar + None => { + logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?; + logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?; + Ok(()) + }, + } +} + +pub async fn handle_delete_pep_node<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + connection: Connected, + node: String, +) -> Result<(), PEPError> { + let id = Uuid::new_v4().to_string(); + let request = Iq { + from: Some(connection.jid().clone().into()), + id: id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == None || + from == Some(connection.jid().to_bare().into()) + { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::PubsubOwner(_) => Ok(()), + q => Err(PEPError::MismatchedQuery(q)), + } + } else { + // Err(PEPError::MissingQuery) + Ok(()) + } + } + IqType::Error => { + Err(PEPError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().to_bare().into()), + )) + } + } + s => Err(PEPError::UnexpectedStanza(s)), + } +} + // TODO: could probably macro-ise? -pub async fn handle_online_result( - logic: &ClientLogic, - command: Command, +pub async fn handle_online_result<Fs: FileStore + Clone + 'static>( + logic: &ClientLogic<Fs>, + command: Command<Fs>, connection: Connected, -) -> Result<(), Error> { +) -> Result<(), Error<Fs>> { match command { Command::GetRoster(result_sender) => { let roster = handle_get_roster(logic, connection).await; let _ = result_sender.send(roster); } + Command::GetRosterWithUsers(result_sender) => { + let roster = handle_get_roster_with_users(logic, connection).await; + let _ = result_sender.send(roster); + } Command::GetChats(sender) => { let chats = handle_get_chats(logic).await; let _ = sender.send(chats); @@ -775,14 +1110,30 @@ pub async fn handle_online_result( let chats = handle_get_chats_ordered_with_latest_messages(logic).await; let _ = sender.send(chats); } + Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => { + let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await; + sender.send(chats); + } Command::GetChat(jid, sender) => { let chat = handle_get_chat(logic, jid).await; let _ = sender.send(chat); } + Command::GetChatAndUser(jid, sender) => { + let chat = handle_get_chat_and_user(logic, jid).await; + let _ = sender.send(chat); + } + Command::GetMessage(id, sender) => { + let message = handle_get_message(logic, id).await; + let _ = sender.send(message); + } Command::GetMessages(jid, sender) => { let messages = handle_get_messages(logic, jid).await; let _ = sender.send(messages); } + Command::GetMessagesWithUsers(jid, sender) => { + let messages = handle_get_messages_with_users(logic, jid).await; + sender.send(messages); + } Command::DeleteChat(jid, sender) => { let result = handle_delete_chat(logic, jid).await; let _ = sender.send(result); @@ -854,14 +1205,26 @@ pub async fn handle_online_result( let result = handle_disco_items(logic, connection, jid, node).await; let _ = sender.send(result); } - Command::Publish { item, node, sender } => { - let result = handle_publish(logic, connection, item, node).await; + Command::PublishPEPItem { item, node, sender } => { + let result = handle_publish_pep_item(logic, connection, item, node).await; let _ = sender.send(result); } Command::ChangeNick(nick, sender) => { let result = handle_change_nick(logic, nick).await; let _ = sender.send(result); } + Command::ChangeAvatar(img_data, sender) => { + let result = handle_change_avatar(logic, img_data).await; + let _ = sender.send(result); + }, + Command::DeletePEPNode { node, sender } => { + let result = handle_delete_pep_node(logic, connection, node).await; + let _ = sender.send(result); + }, + Command::GetPEPItem { node, sender, jid, id } => { + let result = handle_get_pep_item(logic, connection, jid, node, id).await; + let _ = sender.send(result); + }, } Ok(()) } |