aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-04-08 10:38:18 +0100
committerLibravatar cel 🌸 <cel@bunny.garden>2025-04-08 10:38:18 +0100
commit5b644e2dc8712d56931b410b9c46dae1ef36e691 (patch)
tree2290b3ab2a5829c3b8406ce073a95474e146a680 /filamento/src/logic
parentc24541b129a14a598afe00ed4244d49d27513310 (diff)
downloadluz-5b644e2dc8712d56931b410b9c46dae1ef36e691.tar.gz
luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.tar.bz2
luz-5b644e2dc8712d56931b410b9c46dae1ef36e691.zip
feat(filamento): user avatar publishing and processing
Diffstat (limited to '')
-rw-r--r--filamento/src/logic/abort.rs4
-rw-r--r--filamento/src/logic/connect.rs6
-rw-r--r--filamento/src/logic/connection_error.rs7
-rw-r--r--filamento/src/logic/disconnect.rs7
-rw-r--r--filamento/src/logic/local_only.rs37
-rw-r--r--filamento/src/logic/mod.rs27
-rw-r--r--filamento/src/logic/offline.rs45
-rw-r--r--filamento/src/logic/online.rs362
-rw-r--r--filamento/src/logic/process_stanza.rs422
9 files changed, 762 insertions, 155 deletions
diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs
index df82655..3588b13 100644
--- a/filamento/src/logic/abort.rs
+++ b/filamento/src/logic/abort.rs
@@ -1,7 +1,9 @@
use lampada::error::ReadError;
+use crate::files::FileStore;
+
use super::ClientLogic;
-pub async fn on_abort(logic: ClientLogic) {
+pub async fn on_abort<Fs: FileStore + Clone>(logic: ClientLogic<Fs>) {
logic.pending().drain().await;
}
diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs
index dc05448..37cdad5 100644
--- a/filamento/src/logic/connect.rs
+++ b/filamento/src/logic/connect.rs
@@ -5,12 +5,16 @@ use tracing::debug;
use crate::{
Command, UpdateMessage,
error::{ConnectionJobError, Error, RosterError},
+ files::FileStore,
presence::{Online, PresenceType},
};
use super::ClientLogic;
-pub async fn handle_connect(logic: ClientLogic, connection: Connected) {
+pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>(
+ logic: ClientLogic<Fs>,
+ connection: Connected,
+) {
let (send, recv) = oneshot::channel();
debug!("getting roster");
logic
diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs
index 081900b..36c1cef 100644
--- a/filamento/src/logic/connection_error.rs
+++ b/filamento/src/logic/connection_error.rs
@@ -1,7 +1,12 @@
use lampada::error::ConnectionError;
+use crate::files::FileStore;
+
use super::ClientLogic;
-pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) {
+pub async fn handle_connection_error<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
+ error: ConnectionError,
+) {
logic.handle_error(error.into()).await;
}
diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs
index 241c3e6..ebcfd4f 100644
--- a/filamento/src/logic/disconnect.rs
+++ b/filamento/src/logic/disconnect.rs
@@ -1,11 +1,14 @@
use lampada::Connected;
use stanza::client::Stanza;
-use crate::{UpdateMessage, presence::Offline};
+use crate::{UpdateMessage, files::FileStore, presence::Offline};
use super::ClientLogic;
-pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) {
+pub async fn handle_disconnect<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
+ connection: Connected,
+) {
// TODO: be able to set offline status message
let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None);
let stanza = Stanza::Presence(offline_presence);
diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs
index 3f6fe8d..cabbef4 100644
--- a/filamento/src/logic/local_only.rs
+++ b/filamento/src/logic/local_only.rs
@@ -4,44 +4,61 @@ use uuid::Uuid;
use crate::{
chat::{Chat, Message},
error::DatabaseError,
+ files::FileStore,
user::User,
};
use super::ClientLogic;
-pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> {
+pub async fn handle_get_chats<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<Chat>, DatabaseError> {
Ok(logic.db().read_chats().await?)
}
-pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> {
+pub async fn handle_get_chats_ordered<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<Chat>, DatabaseError> {
Ok(logic.db().read_chats_ordered().await?)
}
-pub async fn handle_get_chats_ordered_with_latest_messages(
- logic: &ClientLogic,
+pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
) -> Result<Vec<(Chat, Message)>, DatabaseError> {
Ok(logic.db().read_chats_ordered_with_latest_messages().await?)
}
-pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> {
+pub async fn handle_get_chat<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<Chat, DatabaseError> {
Ok(logic.db().read_chat(jid).await?)
}
-pub async fn handle_get_messages(
- logic: &ClientLogic,
+pub async fn handle_get_messages<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
jid: JID,
) -> Result<Vec<Message>, DatabaseError> {
Ok(logic.db().read_message_history(jid).await?)
}
-pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> {
+pub async fn handle_delete_chat<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<(), DatabaseError> {
Ok(logic.db().delete_chat(jid).await?)
}
-pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> {
+pub async fn handle_delete_messaage<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ uuid: Uuid,
+) -> Result<(), DatabaseError> {
Ok(logic.db().delete_message(uuid).await?)
}
-pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> {
+pub async fn handle_get_user<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<User, DatabaseError> {
Ok(logic.db().read_user(jid).await?)
}
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs
index 1ddd7d3..5e05dac 100644
--- a/filamento/src/logic/mod.rs
+++ b/filamento/src/logic/mod.rs
@@ -4,12 +4,13 @@ use jid::JID;
use lampada::{Connected, Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
-use tracing::{error, info, warn};
+use tracing::{error, info};
use crate::{
Client, Command, UpdateMessage,
db::Db,
error::{Error, IqRequestError, ResponseError},
+ files::FileStore,
};
mod abort;
@@ -22,12 +23,13 @@ mod online;
mod process_stanza;
#[derive(Clone)]
-pub struct ClientLogic {
- client: Client,
+pub struct ClientLogic<Fs: FileStore> {
+ client: Client<Fs>,
bare_jid: JID,
db: Db,
pending: Pending,
update_sender: mpsc::Sender<UpdateMessage>,
+ file_store: Fs,
}
#[derive(Clone)]
@@ -75,12 +77,13 @@ impl Pending {
}
}
-impl ClientLogic {
+impl<Fs: FileStore> ClientLogic<Fs> {
pub fn new(
- client: Client,
+ client: Client<Fs>,
bare_jid: JID,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
+ file_store: Fs,
) -> Self {
Self {
db,
@@ -88,10 +91,11 @@ impl ClientLogic {
update_sender,
client,
bare_jid,
+ file_store,
}
}
- pub fn client(&self) -> &Client {
+ pub fn client(&self) -> &Client<Fs> {
&self.client
}
@@ -103,6 +107,10 @@ impl ClientLogic {
&self.pending
}
+ pub fn file_store(&self) -> &Fs {
+ &self.file_store
+ }
+
pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {
&self.update_sender
}
@@ -113,13 +121,14 @@ impl ClientLogic {
self.update_sender().send(update).await;
}
- pub async fn handle_error(&self, e: Error) {
+ // TODO: delete this
+ pub async fn handle_error(&self, e: Error<Fs>) {
error!("{}", e);
}
}
-impl Logic for ClientLogic {
- type Cmd = Command;
+impl<Fs: FileStore + Clone + Send + Sync> Logic for ClientLogic<Fs> {
+ type Cmd = Command<Fs>;
// pub async fn handle_stream_error(self, error) {}
// stanza errors (recoverable)
diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs
index 6399cf7..566972c 100644
--- a/filamento/src/logic/offline.rs
+++ b/filamento/src/logic/offline.rs
@@ -1,3 +1,5 @@
+use std::process::id;
+
use chrono::Utc;
use lampada::error::WriteError;
use uuid::Uuid;
@@ -6,9 +8,10 @@ use crate::{
Command,
chat::{Delivery, Message},
error::{
- DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError,
- StatusError,
+ AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError,
+ NickError, PEPError, RosterError, StatusError,
},
+ files::FileStore,
presence::Online,
roster::Contact,
};
@@ -22,7 +25,7 @@ use super::{
},
};
-pub async fn handle_offline(logic: ClientLogic, command: Command) {
+pub async fn handle_offline<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>) {
let result = handle_offline_result(&logic, command).await;
match result {
Ok(_) => {}
@@ -30,16 +33,24 @@ pub async fn handle_offline(logic: ClientLogic, command: Command) {
}
}
-pub async fn handle_set_status(logic: &ClientLogic, online: Online) -> Result<(), StatusError> {
+pub async fn handle_set_status<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ online: Online,
+) -> Result<(), StatusError> {
logic.db().upsert_cached_status(online).await?;
Ok(())
}
-pub async fn handle_get_roster(logic: &ClientLogic) -> Result<Vec<Contact>, RosterError> {
+pub async fn handle_get_roster<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<Contact>, RosterError> {
Ok(logic.db().read_cached_roster().await?)
}
-pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Result<(), Error> {
+pub async fn handle_offline_result<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ command: Command<Fs>,
+) -> Result<(), Error<Fs>> {
match command {
Command::GetRoster(sender) => {
let roster = handle_get_roster(logic).await;
@@ -77,7 +88,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
let user = handle_get_user(logic, jid).await;
sender.send(user);
}
- // TODO: offline queue to modify roster
Command::AddContact(_jid, sender) => {
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
@@ -112,7 +122,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
let result = handle_set_status(logic, online).await;
sender.send(result);
}
- // TODO: offline message queue
Command::SendMessage(jid, body) => {
let id = Uuid::new_v4();
let timestamp = Utc::now();
@@ -159,7 +168,7 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
Command::DiscoItems(_jid, _node, sender) => {
sender.send(Err(DiscoError::Write(WriteError::Disconnected)));
}
- Command::Publish {
+ Command::PublishPEPItem {
item: _,
node: _,
sender,
@@ -169,6 +178,24 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
Command::ChangeNick(_, sender) => {
sender.send(Err(NickError::Disconnected));
}
+ Command::ChangeAvatar(_items, sender) => {
+ sender.send(Err(AvatarPublishError::Disconnected));
+ }
+ Command::DeletePEPNode { node: _, sender } => {
+ sender.send(Err(PEPError::IqResponse(IqRequestError::Write(
+ WriteError::Disconnected,
+ ))));
+ }
+ Command::GetPEPItem {
+ node: _,
+ sender,
+ jid: _,
+ id: _,
+ } => {
+ sender.send(Err(PEPError::IqResponse(IqRequestError::Write(
+ WriteError::Disconnected,
+ ))));
+ }
}
Ok(())
}
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index b069f59..745adc1 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -1,23 +1,24 @@
+use std::io::Cursor;
+
+use base64::{prelude::BASE64_STANDARD, Engine};
use chrono::Utc;
+use image::ImageReader;
use jid::JID;
use lampada::{Connected, WriteMessage, error::WriteError};
+use sha1::{Digest, Sha1};
use stanza::{
client::{
iq::{self, Iq, IqType, Query}, Stanza
- },
- xep_0030::{info, items},
- xep_0060::pubsub::{self, Pubsub},
- xep_0172::{self, Nick},
- xep_0203::Delay,
+ }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay
};
use tokio::sync::oneshot;
use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
- chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
- DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError
- }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
+ avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
+ AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError
+ }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
};
use super::{
@@ -29,7 +30,7 @@ use super::{
},
};
-pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
+pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {
let result = handle_online_result(&logic, command, connection).await;
match result {
Ok(_) => {}
@@ -37,8 +38,8 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con
}
}
-pub async fn handle_get_roster(
- logic: &ClientLogic,
+pub async fn handle_get_roster<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
) -> Result<Vec<Contact>, RosterError> {
let iq_id = Uuid::new_v4().to_string();
@@ -96,8 +97,8 @@ pub async fn handle_get_roster(
}
}
-pub async fn handle_add_contact(
- logic: &ClientLogic,
+pub async fn handle_add_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), RosterError> {
@@ -154,8 +155,8 @@ pub async fn handle_add_contact(
}
}
-pub async fn handle_buddy_request(
- logic: &ClientLogic,
+pub async fn handle_buddy_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), SubscribeError> {
@@ -177,8 +178,8 @@ pub async fn handle_buddy_request(
Ok(())
}
-pub async fn handle_subscription_request(
- logic: &ClientLogic,
+pub async fn handle_subscription_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), SubscribeError> {
@@ -195,8 +196,8 @@ pub async fn handle_subscription_request(
Ok(())
}
-pub async fn handle_accept_buddy_request(
- logic: &ClientLogic,
+pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), SubscribeError> {
@@ -218,8 +219,8 @@ pub async fn handle_accept_buddy_request(
Ok(())
}
-pub async fn handle_accept_subscription_request(
- logic: &ClientLogic,
+pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), SubscribeError> {
@@ -274,8 +275,8 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<
Ok(())
}
-pub async fn handle_delete_contact(
- logic: &ClientLogic,
+pub async fn handle_delete_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), RosterError> {
@@ -333,8 +334,8 @@ pub async fn handle_delete_contact(
}
}
-pub async fn handle_update_contact(
- logic: &ClientLogic,
+pub async fn handle_update_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
contact_update: ContactUpdate,
@@ -398,8 +399,8 @@ pub async fn handle_update_contact(
}
}
-pub async fn handle_set_status(
- logic: &ClientLogic,
+pub async fn handle_set_status<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
online: Online,
) -> Result<(), StatusError> {
@@ -411,9 +412,18 @@ pub async fn handle_set_status(
Ok(())
}
-pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) {
+pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) {
// upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default.
- let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
+ // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
+ let have_chatted = match logic.db().upsert_chat_and_user(&jid).await {
+ Ok(have_chatted) => {
+ have_chatted
+ },
+ Err(e) => {
+ error!("{}", e);
+ false
+ },
+ };
let nick;
let mark_chat_as_chatted;
@@ -506,6 +516,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
})
.await;
if mark_chat_as_chatted {
+ debug!("marking chat as chatted");
if let Err(e) = logic.db.mark_chat_as_chatted(jid).await {
logic
.handle_error(MessageSendError::MarkChatAsChatted(e.into()).into())
@@ -540,8 +551,8 @@ pub async fn handle_send_presence(
Ok(())
}
-pub async fn handle_disco_info(
- logic: &ClientLogic,
+pub async fn handle_disco_info<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: Option<JID>,
node: Option<String>,
@@ -611,8 +622,8 @@ pub async fn handle_disco_info(
}
}
-pub async fn handle_disco_items(
- logic: &ClientLogic,
+pub async fn handle_disco_items<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: Option<JID>,
node: Option<String>,
@@ -680,20 +691,60 @@ pub async fn handle_disco_items(
}
}
-pub async fn handle_publish(
- logic: &ClientLogic,
+pub async fn handle_publish_pep_item<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
item: pep::Item,
node: String,
-) -> Result<(), PublishError> {
+) -> Result<(), PEPError> {
let id = Uuid::new_v4().to_string();
let publish = match item {
- pep::Item::Nick(n) => pubsub::Publish {
- node,
- items: vec![pubsub::Item {
- item: Some(pubsub::Content::Nick(Nick(n))),
- ..Default::default()
- }],
+ pep::Item::Nick(n) => {
+ if let Some(n) = n {
+ pubsub::Publish {
+ node,
+ items: vec![pubsub::Item {
+ item: Some(pubsub::Content::Nick(Nick(n))),
+ ..Default::default()
+ }],
+ }
+ } else {
+ pubsub::Publish {
+ node,
+ items: vec![pubsub::Item {
+ item: Some(pubsub::Content::Nick(Nick("".to_string()))),
+ ..Default::default()
+ }]
+ }
+ }
+ },
+ pep::Item::AvatarMetadata(metadata) => {
+ if let Some(metadata) = metadata {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })),
+ id: Some(metadata.hash),
+ ..Default::default()
+ }]}
+ } else {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })),
+ ..Default::default()
+ }]}
+ }
+ },
+ pep::Item::AvatarData(data) => {
+ if let Some(data) = data {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))),
+ id: Some(data.hash),
+ ..Default::default()
+ }] }
+ } else {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))),
+ ..Default::default()
+ }]}
+ }
},
};
let request = Iq {
@@ -726,38 +777,229 @@ pub async fn handle_publish(
if let Some(query) = query {
match query {
Query::Pubsub(_) => Ok(()),
- q => Err(PublishError::MismatchedQuery(q)),
+ q => Err(PEPError::MismatchedQuery(q)),
}
} else {
- Err(PublishError::MissingQuery)
+ Err(PEPError::MissingQuery)
}
}
IqType::Error => {
- Err(PublishError::StanzaErrors(errors))
+ Err(PEPError::StanzaErrors(errors))
}
_ => unreachable!(),
}
} else {
- Err(PublishError::IncorrectEntity(
+ Err(PEPError::IncorrectEntity(
from.unwrap_or_else(|| connection.jid().as_bare()),
))
}
}
- s => Err(PublishError::UnexpectedStanza(s)),
+ s => Err(PEPError::UnexpectedStanza(s)),
}
}
-pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> {
+pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> {
+ let stanza_id = Uuid::new_v4().to_string();
+ let request = Iq {
+ from: Some(connection.jid().clone()),
+ id: stanza_id.clone(),
+ to: jid.clone(),
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items {
+ max_items: None,
+ node,
+ subid: None,
+ items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }],
+ }))),
+ errors: Vec::new(),
+ };
+ match logic
+ .pending()
+ .request(&connection, Stanza::Iq(request), stanza_id)
+ .await? {
+
+ Stanza::Iq(Iq {
+ from,
+ r#type,
+ query,
+ errors,
+ ..
+ // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
+ }) if r#type == IqType::Result || r#type == IqType::Error => {
+ if from == jid || {
+ if jid == None {
+ from == Some(connection.jid().as_bare())
+ } else {
+ false
+ }
+ } {
+ match r#type {
+ IqType::Result => {
+ if let Some(query) = query {
+ match query {
+ Query::Pubsub(Pubsub::Items(mut items)) => {
+ if let Some(item) = items.items.pop() {
+ if item.id == Some(id.clone()) {
+ match item.item.ok_or(PEPError::MissingItem)? {
+ pubsub::Content::Nick(nick) => {
+ if nick.0.is_empty() {
+ Ok(pep::Item::Nick(None))
+ } else {
+ Ok(pep::Item::Nick(Some(nick.0)))
+
+ }
+ },
+ pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))),
+ pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))),
+ pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem),
+ }
+ } else {
+ Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string())))
+ }
+ } else {
+ Err(PEPError::MissingItem)
+ }
+ },
+ q => Err(PEPError::MismatchedQuery(q)),
+ }
+ } else {
+ Err(PEPError::MissingQuery)
+ }
+ }
+ IqType::Error => {
+ Err(PEPError::StanzaErrors(errors))
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ // TODO: include expected entity
+ Err(PEPError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().as_bare()),
+ ))
+ }
+ }
+ s => Err(PEPError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> {
logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?;
Ok(())
}
+pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> {
+ match img_data {
+ // set avatar
+ Some(data) => {
+ // load the image data and guess the format
+ let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?;
+
+ // convert the image to png;
+ let mut data_png = Vec::new();
+ image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Png)?;
+
+ // calculate the length of the data in bytes.
+ let bytes = data_png.len().try_into()?;
+
+ // calculate sha1 hash of the data
+ let mut sha1 = Sha1::new();
+ sha1.update(&data_png);
+ let sha1_result = sha1.finalize();
+ let hash = BASE64_STANDARD.encode(sha1_result);
+
+ // encode the image data as base64
+ let data_b64 = BASE64_STANDARD.encode(data_png.clone());
+
+ // publish the data to the data node
+ logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?;
+
+ // publish the metadata to the metadata node
+ logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/png".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?;
+
+ // if everything went well, save the data to the disk.
+
+ if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? {
+ logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))?
+ }
+ // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval.
+ // TODO: should the node be purged?
+
+ Ok(())
+ },
+ // remove avatar
+ None => {
+ logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?;
+ logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?;
+ Ok(())
+ },
+ }
+}
+
+pub async fn handle_delete_pep_node<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ connection: Connected,
+ node: String,
+) -> Result<(), PEPError> {
+ let id = Uuid::new_v4().to_string();
+ let request = Iq {
+ from: Some(connection.jid().clone()),
+ id: id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))),
+ errors: Vec::new(),
+ };
+ match logic
+ .pending()
+ .request(&connection, Stanza::Iq(request), id)
+ .await? {
+
+ Stanza::Iq(Iq {
+ from,
+ r#type,
+ query,
+ errors,
+ ..
+ // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
+ }) if r#type == IqType::Result || r#type == IqType::Error => {
+ if from == None ||
+ from == Some(connection.jid().as_bare())
+ {
+ match r#type {
+ IqType::Result => {
+ if let Some(query) = query {
+ match query {
+ Query::PubsubOwner(_) => Ok(()),
+ q => Err(PEPError::MismatchedQuery(q)),
+ }
+ } else {
+ // Err(PEPError::MissingQuery)
+ Ok(())
+ }
+ }
+ IqType::Error => {
+ Err(PEPError::StanzaErrors(errors))
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ Err(PEPError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().as_bare()),
+ ))
+ }
+ }
+ s => Err(PEPError::UnexpectedStanza(s)),
+ }
+}
+
// TODO: could probably macro-ise?
-pub async fn handle_online_result(
- logic: &ClientLogic,
- command: Command,
+pub async fn handle_online_result<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ command: Command<Fs>,
connection: Connected,
-) -> Result<(), Error> {
+) -> Result<(), Error<Fs>> {
match command {
Command::GetRoster(result_sender) => {
let roster = handle_get_roster(logic, connection).await;
@@ -854,14 +1096,26 @@ pub async fn handle_online_result(
let result = handle_disco_items(logic, connection, jid, node).await;
let _ = sender.send(result);
}
- Command::Publish { item, node, sender } => {
- let result = handle_publish(logic, connection, item, node).await;
+ Command::PublishPEPItem { item, node, sender } => {
+ let result = handle_publish_pep_item(logic, connection, item, node).await;
let _ = sender.send(result);
}
Command::ChangeNick(nick, sender) => {
let result = handle_change_nick(logic, nick).await;
let _ = sender.send(result);
}
+ Command::ChangeAvatar(img_data, sender) => {
+ let result = handle_change_avatar(logic, img_data).await;
+ let _ = sender.send(result);
+ },
+ Command::DeletePEPNode { node, sender } => {
+ let result = handle_delete_pep_node(logic, connection, node).await;
+ let _ = sender.send(result);
+ },
+ Command::GetPEPItem { node, sender, jid, id } => {
+ let result = handle_get_pep_item(logic, connection, jid, node, id).await;
+ let _ = sender.send(result);
+ },
}
Ok(())
}
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 11d7588..c383d70 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -1,7 +1,9 @@
use std::str::FromStr;
+use base64::{Engine, prelude::BASE64_STANDARD};
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
+use sha1::{Digest, Sha1};
use stanza::{
client::{
Stanza,
@@ -17,14 +19,22 @@ use uuid::Uuid;
use crate::{
UpdateMessage, caps,
chat::{Body, Message},
- error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError},
+ error::{
+ AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
+ RosterError,
+ },
+ files::FileStore,
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
};
use super::ClientLogic;
-pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) {
+pub async fn handle_stanza<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
+ stanza: Stanza,
+ connection: Connected,
+) {
let result = process_stanza(logic.clone(), stanza, connection).await;
match result {
Ok(u) => match u {
@@ -38,10 +48,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne
}
}
-pub async fn recv_message(
- logic: ClientLogic,
+pub async fn recv_message<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
stanza_message: stanza::client::message::Message,
-) -> Result<Option<UpdateMessage>, MessageRecvError> {
+) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> {
if let Some(from) = stanza_message.from {
// TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
let timestamp = stanza_message
@@ -50,6 +60,7 @@ pub async fn recv_message(
.unwrap_or_else(|| Utc::now());
// TODO: group chat messages
+ // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state.
// if there is a body, should create chat message
if let Some(body) = stanza_message.body {
let message = Message {
@@ -67,16 +78,28 @@ pub async fn recv_message(
};
// save the message to the database
- logic.db().upsert_chat_and_user(&from).await?;
- if let Err(e) = logic
- .db()
- .create_message_with_user_resource(message.clone(), from.clone(), from.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
- .await;
- }
+ match logic.db().upsert_chat_and_user(&from).await {
+ Ok(_) => {
+ if let Err(e) = logic
+ .db()
+ .create_message_with_user_resource(
+ message.clone(),
+ from.clone(),
+ from.clone(),
+ )
+ .await
+ {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ }
+ };
// update the client with the new message
logic
@@ -89,70 +112,333 @@ pub async fn recv_message(
}
if let Some(nick) = stanza_message.nick {
- if let Err(e) = logic
- .db()
- .upsert_user_nick(from.as_bare(), nick.0.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
- .await;
+ let nick = nick.0;
+ if nick.is_empty() {
+ match logic.db().delete_user_nick(from.as_bare()).await {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ } else {
+ match logic
+ .db()
+ .upsert_user_nick(from.as_bare(), nick.clone())
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
}
-
- logic
- .update_sender()
- .send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
- nick: nick.0,
- })
- .await;
}
if let Some(event) = stanza_message.event {
match event {
- Event::Items(items) => match items.node.as_str() {
- "http://jabber.org/protocol/nick" => match items.items {
- ItemsType::Item(items) => {
- if let Some(item) = items.first() {
- match &item.item {
- Some(c) => match c {
- Content::Nick(nick) => {
- if let Err(e) = logic
- .db()
- .upsert_user_nick(from.as_bare(), nick.0.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(
- MessageRecvError::NickUpdate(e),
- ))
- .await;
+ Event::Items(items) => {
+ match items.node.as_str() {
+ "http://jabber.org/protocol/nick" => match items.items {
+ ItemsType::Item(items) => {
+ if let Some(item) = items.first() {
+ match &item.item {
+ Some(c) => match c {
+ Content::Nick(nick) => {
+ let nick = nick.0.clone();
+ if nick.is_empty() {
+ match logic
+ .db()
+ .delete_user_nick(from.as_bare())
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::NickUpdate(e),
+ ))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ } else {
+ match logic
+ .db()
+ .upsert_user_nick(
+ from.as_bare(),
+ nick.clone(),
+ )
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::NickUpdate(e),
+ ))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ }
}
+ _ => {}
+ },
+ None => {}
+ }
+ }
+ }
+ _ => {}
+ },
+ "urn:xmpp:avatar:metadata" => {
+ match items.items {
+ ItemsType::Item(items) => {
+ if let Some(item) = items.first() {
+ match &item.item {
+ Some(Content::AvatarMetadata(metadata)) => {
+ // check if user avatar has been deleted
+ if let Some(metadata) = metadata
+ .info
+ .iter()
+ .find(|info| info.url.is_none())
+ {
+ // check if user avatar has changed
+ match logic
+ .db()
+ .upsert_user_avatar(
+ from.as_bare(),
+ metadata.id.clone(),
+ )
+ .await
+ {
+ Ok((changed, old_avatar)) => {
+ if changed {
+ if let Some(old_avatar) = old_avatar
+ {
+ if let Err(e) = logic
+ .file_store()
+ .delete(&old_avatar)
+ .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
+ logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
+ }
+ }
- logic
- .update_sender()
- .send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
- nick: nick.0.clone(),
- })
- .await;
+ match logic
+ .file_store()
+ .is_stored(&metadata.id)
+ .await
+ .map_err(|err| {
+ AvatarUpdateError::<Fs>::FileStore(
+ err,
+ )
+ }) {
+ Ok(false) => {
+ // get data
+ let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?;
+ match pep_item {
+ crate::pep::Item::AvatarData(data) => {
+ let data = data.map(|data| data.data_b64).unwrap_or_default();
+ // TODO: these should all be in a separate avatarupdate function
+ match BASE64_STANDARD.decode(data) {
+ Ok(data) => {
+ let mut hasher = Sha1::new();
+ hasher.update(&data);
+ let received_data_hash = BASE64_STANDARD.encode(hasher.finalize());
+ if received_data_hash == metadata.id {
+ if let Err(e) = logic.file_store().store(&received_data_hash, &data).await {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await;
+ }
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: Some(
+ metadata.id.clone(),
+ ),
+ },
+ )
+ .await;
+ }
+ },
+ Err(e) => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await;
+ },
+ }
+ },
+ _ => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await;
+ }
+ }
+ }
+ Ok(true) => {
+ // just send the update
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: Some(
+ metadata.id.clone(),
+ ),
+ },
+ )
+ .await;
+ }
+ Err(e) => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await;
+ }
+ }
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::AvatarUpdate(
+ AvatarUpdateError::Database(
+ e,
+ ),
+ ),
+ ))
+ .await;
+ }
+ }
+ } else {
+ // delete avatar
+ match logic
+ .db()
+ .delete_user_avatar(from.as_bare())
+ .await
+ {
+ Ok((changed, old_avatar)) => {
+ if changed {
+ if let Some(old_avatar) = old_avatar
+ {
+ if let Err(e) = logic
+ .file_store()
+ .delete(&old_avatar)
+ .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
+ logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
+ }
+ }
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: None,
+ },
+ )
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::AvatarUpdate(
+ AvatarUpdateError::Database(
+ e,
+ ),
+ ),
+ ))
+ .await;
+ }
+ }
+ }
+ // check if the new file is in the file store
+ // if not, retrieve from server and save in the file store (remember to check if the hash matches)
+ // send the avatar update
+ }
+ _ => {}
}
- Content::Unknown(element) => {}
- },
- None => {}
+ }
}
+ _ => {}
}
}
- ItemsType::Retract(retracts) => {}
- },
- _ => {}
- },
+ _ => {}
+ }
+ }
// Event::Collection(collection) => todo!(),
// Event::Configuration(configuration) => todo!(),
// Event::Delete(delete) => todo!(),
// Event::Purge(purge) => todo!(),
// Event::Subscription(subscription) => todo!(),
- _ => {}
+ _ => {} // TODO: catch these catch-alls in some way
}
}
@@ -240,8 +526,8 @@ pub async fn recv_presence(
}
}
-pub async fn recv_iq(
- logic: ClientLogic,
+pub async fn recv_iq<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
connection: Connected,
iq: Iq,
) -> Result<Option<UpdateMessage>, IqError> {
@@ -469,11 +755,11 @@ pub async fn recv_iq(
}
}
-pub async fn process_stanza(
- logic: ClientLogic,
+pub async fn process_stanza<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
stanza: Stanza,
connection: Connected,
-) -> Result<Option<UpdateMessage>, Error> {
+) -> Result<Option<UpdateMessage>, Error<Fs>> {
let update = match stanza {
Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
Stanza::Presence(presence) => Ok(recv_presence(presence).await?),