aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/online.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/logic/online.rs')
-rw-r--r--filamento/src/logic/online.rs565
1 files changed, 464 insertions, 101 deletions
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index b069f59..b36f9a9 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -1,35 +1,35 @@
+use std::{io::Cursor, time::Duration};
+
+use base64::{prelude::BASE64_STANDARD, Engine};
use chrono::Utc;
-use jid::JID;
+use image::ImageReader;
+use jid::{BareJID, JID};
use lampada::{Connected, WriteMessage, error::WriteError};
+use sha1::{Digest, Sha1};
use stanza::{
client::{
iq::{self, Iq, IqType, Query}, Stanza
- },
- xep_0030::{info, items},
- xep_0060::pubsub::{self, Pubsub},
- xep_0172::{self, Nick},
- xep_0203::Delay,
+ }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay
};
-use tokio::sync::oneshot;
+use tokio::{sync::oneshot, task::spawn_blocking};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
- chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
- DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError
- }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
+ avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
+ AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError
+ }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage
};
use super::{
- ClientLogic,
local_only::{
- handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
- handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
- },
+ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chat_and_user, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_message, handle_get_messages, handle_get_messages_with_users, handle_get_user
+ }, ClientLogic
};
-pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
+pub async fn handle_online<Fs: FileStore + Clone + 'static>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {
let result = handle_online_result(&logic, command, connection).await;
match result {
Ok(_) => {}
@@ -37,13 +37,13 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con
}
}
-pub async fn handle_get_roster(
- logic: &ClientLogic,
+pub async fn handle_get_roster<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
) -> Result<Vec<Contact>, RosterError> {
let iq_id = Uuid::new_v4().to_string();
let stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq_id.to_string(),
to: None,
r#type: IqType::Get,
@@ -96,14 +96,79 @@ pub async fn handle_get_roster(
}
}
-pub async fn handle_add_contact(
- logic: &ClientLogic,
+// this can't query the client... otherwise there is a hold-up and the connection can't complete
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ connection: Connected,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone().into()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, stanza, iq_id.clone())
+ .await?;
+ // TODO: timeout
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .handle_error(Error::Roster(RosterError::Cache(e.into())))
+ .await;
+ };
+ let mut users = Vec::new();
+ for contact in &contacts {
+ let user = logic.db().read_user(contact.user_jid.clone()).await?;
+ users.push(user);
+ }
+ Ok(contacts.into_iter().zip(users).collect())
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_add_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -154,12 +219,13 @@ pub async fn handle_add_contact(
}
}
-pub async fn handle_buddy_request(
- logic: &ClientLogic,
+pub async fn handle_buddy_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), SubscribeError> {
- let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let jid: JID = jid.into();
+ let client_user = logic.db.read_user(logic.jid.clone()).await?;
let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
to: Some(jid.clone()),
@@ -177,16 +243,16 @@ pub async fn handle_buddy_request(
Ok(())
}
-pub async fn handle_subscription_request(
- logic: &ClientLogic,
+pub async fn handle_subscription_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), SubscribeError> {
// TODO: i should probably have builders
- let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let client_user = logic.db.read_user(logic.jid.clone()).await?;
let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- to: Some(jid),
+ to: Some(jid.into()),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
nick,
..Default::default()
@@ -195,18 +261,19 @@ pub async fn handle_subscription_request(
Ok(())
}
-pub async fn handle_accept_buddy_request(
- logic: &ClientLogic,
+pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), SubscribeError> {
+ let jid: JID = jid.into();
let presence = Stanza::Presence(stanza::client::presence::Presence {
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
..Default::default()
});
connection.write_handle().write(presence).await?;
- let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let client_user = logic.db.read_user(logic.jid.clone()).await?;
let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
to: Some(jid),
@@ -218,17 +285,14 @@ pub async fn handle_accept_buddy_request(
Ok(())
}
-pub async fn handle_accept_subscription_request(
- logic: &ClientLogic,
+pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), SubscribeError> {
- let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
- let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- nick,
+ to: Some(jid.into()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
..Default::default()
});
connection.write_handle().write(presence).await?;
@@ -237,10 +301,10 @@ pub async fn handle_accept_subscription_request(
pub async fn handle_unsubscribe_from_contact(
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- to: Some(jid),
+ to: Some(jid.into()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
..Default::default()
});
@@ -248,9 +312,9 @@ pub async fn handle_unsubscribe_from_contact(
Ok(())
}
-pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
+pub async fn handle_unsubscribe_contact(connection: Connected, jid: BareJID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- to: Some(jid),
+ to: Some(jid.into()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
..Default::default()
});
@@ -258,7 +322,8 @@ pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Resu
Ok(())
}
-pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
+pub async fn handle_unfriend_contact(connection: Connected, jid: BareJID) -> Result<(), WriteError> {
+ let jid: JID = jid.into();
let presence = Stanza::Presence(stanza::client::presence::Presence {
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
@@ -274,14 +339,14 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<
Ok(())
}
-pub async fn handle_delete_contact(
- logic: &ClientLogic,
+pub async fn handle_delete_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -333,10 +398,10 @@ pub async fn handle_delete_contact(
}
}
-pub async fn handle_update_contact(
- logic: &ClientLogic,
+pub async fn handle_update_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
contact_update: ContactUpdate,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
@@ -347,7 +412,8 @@ pub async fn handle_update_contact(
.map(|group| stanza::roster::Group(Some(group))),
);
let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
+ // TODO: these clones could technically be avoided?
+ from: Some(connection.jid().clone().into()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -398,8 +464,8 @@ pub async fn handle_update_contact(
}
}
-pub async fn handle_set_status(
- logic: &ClientLogic,
+pub async fn handle_set_status<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
online: Online,
) -> Result<(), StatusError> {
@@ -411,14 +477,23 @@ pub async fn handle_set_status(
Ok(())
}
-pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) {
+pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: BareJID, body: Body) {
// upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default.
- let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
+ // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
+ let have_chatted = match logic.db().upsert_chat_and_user(jid.clone()).await {
+ Ok(have_chatted) => {
+ have_chatted
+ },
+ Err(e) => {
+ error!("{}", e);
+ false
+ },
+ };
let nick;
let mark_chat_as_chatted;
if have_chatted == false {
- match logic.db.read_user(logic.bare_jid.clone()).await {
+ match logic.db.read_user(logic.jid.clone()).await {
Ok(u) => {
nick = u.nick.map(|nick| Nick(nick));
mark_chat_as_chatted = true;
@@ -441,7 +516,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
let timestamp = Utc::now();
let message = Message {
id,
- from: connection.jid().as_bare(),
+ from: connection.jid().to_bare(),
body: body.clone(),
timestamp,
delivery: Some(Delivery::Sending),
@@ -451,7 +526,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
// TODO: mark these as potentially failed upon client launch
if let Err(e) = logic
.db()
- .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone())
+ .create_message_with_user_resource(message.clone(), jid.clone(), connection.jid().clone())
.await
{
// TODO: should these really be handle_error or just the error macro?
@@ -460,20 +535,33 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
.await;
}
+ let from = match logic.db().read_user(logic.jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ },
+ };
+
// tell the client a message is being sent
logic
.update_sender()
.send(UpdateMessage::Message {
- to: jid.as_bare(),
+ to: jid.clone(),
message,
+ from,
})
.await;
// prepare the message stanza
let message_stanza = Stanza::Message(stanza::client::message::Message {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: Some(id.to_string()),
- to: Some(jid.clone()),
+ to: Some(jid.clone().into()),
// TODO: specify message type
r#type: stanza::client::message::MessageType::Chat,
// TODO: lang ?
@@ -498,14 +586,19 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
match result {
Ok(_) => {
info!("sent message: {:?}", message_stanza);
+ if let Err(e) = logic.db().update_message_delivery(id, Delivery::Written).await {
+ error!("updating message delivery: {}", e);
+ }
logic
.update_sender()
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Written,
+ chat: jid.clone(),
})
.await;
if mark_chat_as_chatted {
+ debug!("marking chat as chatted");
if let Err(e) = logic.db.mark_chat_as_chatted(jid).await {
logic
.handle_error(MessageSendError::MarkChatAsChatted(e.into()).into())
@@ -519,6 +612,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Failed,
+ chat: jid,
})
.await;
logic.handle_error(MessageSendError::Write(e).into()).await;
@@ -540,15 +634,15 @@ pub async fn handle_send_presence(
Ok(())
}
-pub async fn handle_disco_info(
- logic: &ClientLogic,
+pub async fn handle_disco_info<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: Option<JID>,
node: Option<String>,
) -> Result<Info, DiscoError> {
let id = Uuid::new_v4().to_string();
let request = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: id.clone(),
to: jid.clone(),
r#type: IqType::Get,
@@ -576,7 +670,7 @@ pub async fn handle_disco_info(
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == jid || {
if jid == None {
- from == Some(connection.jid().as_bare())
+ from == Some(connection.jid().to_bare().into())
} else {
false
}
@@ -603,7 +697,7 @@ pub async fn handle_disco_info(
}
} else {
Err(DiscoError::IncorrectEntity(
- from.unwrap_or_else(|| connection.jid().as_bare()),
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
))
}
}
@@ -611,15 +705,15 @@ pub async fn handle_disco_info(
}
}
-pub async fn handle_disco_items(
- logic: &ClientLogic,
+pub async fn handle_disco_items<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: Option<JID>,
node: Option<String>,
) -> Result<Items, DiscoError> {
let id = Uuid::new_v4().to_string();
let request = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: id.clone(),
to: jid.clone(),
r#type: IqType::Get,
@@ -645,7 +739,7 @@ pub async fn handle_disco_items(
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == jid || {
if jid == None {
- from == Some(connection.jid().as_bare())
+ from == Some(connection.jid().to_bare().into())
} else {
false
}
@@ -672,7 +766,7 @@ pub async fn handle_disco_items(
}
} else {
Err(DiscoError::IncorrectEntity(
- from.unwrap_or_else(|| connection.jid().as_bare()),
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
))
}
}
@@ -680,24 +774,64 @@ pub async fn handle_disco_items(
}
}
-pub async fn handle_publish(
- logic: &ClientLogic,
+pub async fn handle_publish_pep_item<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
item: pep::Item,
node: String,
-) -> Result<(), PublishError> {
+) -> Result<(), PEPError> {
let id = Uuid::new_v4().to_string();
let publish = match item {
- pep::Item::Nick(n) => pubsub::Publish {
- node,
- items: vec![pubsub::Item {
- item: Some(pubsub::Content::Nick(Nick(n))),
- ..Default::default()
- }],
+ pep::Item::Nick(n) => {
+ if let Some(n) = n {
+ pubsub::Publish {
+ node,
+ items: vec![pubsub::Item {
+ item: Some(pubsub::Content::Nick(Nick(n))),
+ ..Default::default()
+ }],
+ }
+ } else {
+ pubsub::Publish {
+ node,
+ items: vec![pubsub::Item {
+ item: Some(pubsub::Content::Nick(Nick("".to_string()))),
+ ..Default::default()
+ }]
+ }
+ }
+ },
+ pep::Item::AvatarMetadata(metadata) => {
+ if let Some(metadata) = metadata {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })),
+ id: Some(metadata.hash),
+ ..Default::default()
+ }]}
+ } else {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })),
+ ..Default::default()
+ }]}
+ }
+ },
+ pep::Item::AvatarData(data) => {
+ if let Some(data) = data {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))),
+ id: Some(data.hash),
+ ..Default::default()
+ }] }
+ } else {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))),
+ ..Default::default()
+ }]}
+ }
},
};
let request = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: id.clone(),
to: None,
r#type: IqType::Set,
@@ -719,50 +853,251 @@ pub async fn handle_publish(
// TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == None ||
- from == Some(connection.jid().as_bare())
+ from == Some(connection.jid().to_bare().into())
{
match r#type {
IqType::Result => {
if let Some(query) = query {
match query {
Query::Pubsub(_) => Ok(()),
- q => Err(PublishError::MismatchedQuery(q)),
+ q => Err(PEPError::MismatchedQuery(q)),
+ }
+ } else {
+ Err(PEPError::MissingQuery)
+ }
+ }
+ IqType::Error => {
+ Err(PEPError::StanzaErrors(errors))
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ Err(PEPError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
+ ))
+ }
+ }
+ s => Err(PEPError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<BareJID>, node: String, id: String) -> Result<pep::Item, PEPError> {
+ let jid = jid.map(|jid| Into::<JID>::into(jid));
+ let stanza_id = Uuid::new_v4().to_string();
+ let request = Iq {
+ from: Some(connection.jid().clone().into()),
+ id: stanza_id.clone(),
+ to: jid.clone(),
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items {
+ max_items: None,
+ node,
+ subid: None,
+ items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }],
+ }))),
+ errors: Vec::new(),
+ };
+ match logic
+ .pending()
+ .request(&connection, Stanza::Iq(request), stanza_id)
+ .await? {
+
+ Stanza::Iq(Iq {
+ from,
+ r#type,
+ query,
+ errors,
+ ..
+ // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
+ }) if r#type == IqType::Result || r#type == IqType::Error => {
+ if from == jid || {
+ if jid == None {
+ from == Some(connection.jid().to_bare().into())
+ } else {
+ false
+ }
+ } {
+ match r#type {
+ IqType::Result => {
+ if let Some(query) = query {
+ match query {
+ Query::Pubsub(Pubsub::Items(mut items)) => {
+ if let Some(item) = items.items.pop() {
+ if item.id == Some(id.clone()) {
+ match item.item.ok_or(PEPError::MissingItem)? {
+ pubsub::Content::Nick(nick) => {
+ if nick.0.is_empty() {
+ Ok(pep::Item::Nick(None))
+ } else {
+ Ok(pep::Item::Nick(Some(nick.0)))
+
+ }
+ },
+ pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))),
+ pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))),
+ pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem),
+ }
+ } else {
+ Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string())))
+ }
+ } else {
+ Err(PEPError::MissingItem)
+ }
+ },
+ q => Err(PEPError::MismatchedQuery(q)),
}
} else {
- Err(PublishError::MissingQuery)
+ Err(PEPError::MissingQuery)
}
}
IqType::Error => {
- Err(PublishError::StanzaErrors(errors))
+ Err(PEPError::StanzaErrors(errors))
}
_ => unreachable!(),
}
} else {
- Err(PublishError::IncorrectEntity(
- from.unwrap_or_else(|| connection.jid().as_bare()),
+ // TODO: include expected entity
+ Err(PEPError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
))
}
}
- s => Err(PublishError::UnexpectedStanza(s)),
+ s => Err(PEPError::UnexpectedStanza(s)),
}
}
-pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> {
+pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> {
logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?;
Ok(())
}
+pub async fn handle_change_avatar<Fs: FileStore + Clone + 'static>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> {
+ match img_data {
+ // set avatar
+ Some(data) => {
+ let (bytes, hash, data_png, data_b64) = spawn_blocking(move || -> Result<_, _> {
+ // load the image data and guess the format
+ let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?;
+
+ // convert the image to png;
+ let mut data_png = Vec::new();
+ let image = image.resize(192, 192, image::imageops::FilterType::Nearest);
+ image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Jpeg)?;
+
+ // calculate the length of the data in bytes.
+ let bytes = data_png.len().try_into()?;
+
+ // calculate sha1 hash of the data
+ let mut sha1 = Sha1::new();
+ sha1.update(&data_png);
+ let sha1_result = sha1.finalize();
+ let hash = hex::encode(sha1_result);
+
+ // encode the image data as base64
+ let data_b64 = BASE64_STANDARD.encode(data_png.clone());
+
+ Ok::<(u32, String, Vec<u8>, String), AvatarPublishError<Fs>>((bytes, hash, data_png, data_b64))
+ }).await.unwrap()?;
+
+ // publish the data to the data node
+ logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?;
+
+ // publish the metadata to the metadata node
+ logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/jpeg".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?;
+
+ // if everything went well, save the data to the disk.
+
+ if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? {
+ logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))?
+ }
+ // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval.
+ // TODO: should the node be purged?
+
+ Ok(())
+ },
+ // remove avatar
+ None => {
+ logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?;
+ logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?;
+ Ok(())
+ },
+ }
+}
+
+pub async fn handle_delete_pep_node<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ connection: Connected,
+ node: String,
+) -> Result<(), PEPError> {
+ let id = Uuid::new_v4().to_string();
+ let request = Iq {
+ from: Some(connection.jid().clone().into()),
+ id: id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))),
+ errors: Vec::new(),
+ };
+ match logic
+ .pending()
+ .request(&connection, Stanza::Iq(request), id)
+ .await? {
+
+ Stanza::Iq(Iq {
+ from,
+ r#type,
+ query,
+ errors,
+ ..
+ // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
+ }) if r#type == IqType::Result || r#type == IqType::Error => {
+ if from == None ||
+ from == Some(connection.jid().to_bare().into())
+ {
+ match r#type {
+ IqType::Result => {
+ if let Some(query) = query {
+ match query {
+ Query::PubsubOwner(_) => Ok(()),
+ q => Err(PEPError::MismatchedQuery(q)),
+ }
+ } else {
+ // Err(PEPError::MissingQuery)
+ Ok(())
+ }
+ }
+ IqType::Error => {
+ Err(PEPError::StanzaErrors(errors))
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ Err(PEPError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
+ ))
+ }
+ }
+ s => Err(PEPError::UnexpectedStanza(s)),
+ }
+}
+
// TODO: could probably macro-ise?
-pub async fn handle_online_result(
- logic: &ClientLogic,
- command: Command,
+pub async fn handle_online_result<Fs: FileStore + Clone + 'static>(
+ logic: &ClientLogic<Fs>,
+ command: Command<Fs>,
connection: Connected,
-) -> Result<(), Error> {
+) -> Result<(), Error<Fs>> {
match command {
Command::GetRoster(result_sender) => {
let roster = handle_get_roster(logic, connection).await;
let _ = result_sender.send(roster);
}
+ Command::GetRosterWithUsers(result_sender) => {
+ let roster = handle_get_roster_with_users(logic, connection).await;
+ let _ = result_sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
let _ = sender.send(chats);
@@ -775,14 +1110,30 @@ pub async fn handle_online_result(
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
let _ = sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chat = handle_get_chat(logic, jid).await;
let _ = sender.send(chat);
}
+ Command::GetChatAndUser(jid, sender) => {
+ let chat = handle_get_chat_and_user(logic, jid).await;
+ let _ = sender.send(chat);
+ }
+ Command::GetMessage(id, sender) => {
+ let message = handle_get_message(logic, id).await;
+ let _ = sender.send(message);
+ }
Command::GetMessages(jid, sender) => {
let messages = handle_get_messages(logic, jid).await;
let _ = sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
let _ = sender.send(result);
@@ -854,14 +1205,26 @@ pub async fn handle_online_result(
let result = handle_disco_items(logic, connection, jid, node).await;
let _ = sender.send(result);
}
- Command::Publish { item, node, sender } => {
- let result = handle_publish(logic, connection, item, node).await;
+ Command::PublishPEPItem { item, node, sender } => {
+ let result = handle_publish_pep_item(logic, connection, item, node).await;
let _ = sender.send(result);
}
Command::ChangeNick(nick, sender) => {
let result = handle_change_nick(logic, nick).await;
let _ = sender.send(result);
}
+ Command::ChangeAvatar(img_data, sender) => {
+ let result = handle_change_avatar(logic, img_data).await;
+ let _ = sender.send(result);
+ },
+ Command::DeletePEPNode { node, sender } => {
+ let result = handle_delete_pep_node(logic, connection, node).await;
+ let _ = sender.send(result);
+ },
+ Command::GetPEPItem { node, sender, jid, id } => {
+ let result = handle_get_pep_item(logic, connection, jid, node, id).await;
+ let _ = sender.send(result);
+ },
}
Ok(())
}