diff options
Diffstat (limited to '')
-rw-r--r-- | README.md | 3 | ||||
-rw-r--r-- | filamento/.gitignore | 1 | ||||
-rw-r--r-- | filamento/Cargo.toml | 3 | ||||
-rw-r--r-- | filamento/examples/example.rs | 54 | ||||
-rw-r--r-- | filamento/migrations/20240113011930_luz.sql | 1 | ||||
-rw-r--r-- | filamento/src/avatar.rs | 34 | ||||
-rw-r--r-- | filamento/src/caps.rs | 1 | ||||
-rw-r--r-- | filamento/src/db.rs | 105 | ||||
-rw-r--r-- | filamento/src/error.rs | 71 | ||||
-rw-r--r-- | filamento/src/files.rs | 19 | ||||
-rw-r--r-- | filamento/src/lib.rs | 154 | ||||
-rw-r--r-- | filamento/src/logic/abort.rs | 4 | ||||
-rw-r--r-- | filamento/src/logic/connect.rs | 6 | ||||
-rw-r--r-- | filamento/src/logic/connection_error.rs | 7 | ||||
-rw-r--r-- | filamento/src/logic/disconnect.rs | 7 | ||||
-rw-r--r-- | filamento/src/logic/local_only.rs | 37 | ||||
-rw-r--r-- | filamento/src/logic/mod.rs | 27 | ||||
-rw-r--r-- | filamento/src/logic/offline.rs | 45 | ||||
-rw-r--r-- | filamento/src/logic/online.rs | 362 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 422 | ||||
-rw-r--r-- | filamento/src/pep.rs | 17 | ||||
-rw-r--r-- | stanza/src/client/iq.rs | 13 | ||||
-rw-r--r-- | stanza/src/xep_0060/owner.rs | 4 |
23 files changed, 1169 insertions, 228 deletions
@@ -151,7 +151,8 @@ #### to write: - [ ] advanced message body (for custom emoji, styling, etc.). can opt in to mixed content in message body. base as bold, italic, underline, strikethrough, then extensible with new tags for e.g. emoji, color, size, font, chat effects. -- [ ] omemo cross-signing verification or something along those lines, with a main device and linked devices +- [ ] omemo cross-signing verification or something along those lines, with a main device and linked devices. safety numbers. +- [ ] better stable ids....? - [ ] better stickers/emoji xep - [ ] mix voice channels (w/ sfu) - [ ] mix guilds diff --git a/filamento/.gitignore b/filamento/.gitignore index ec8a40b..1ba9f2a 100644 --- a/filamento/.gitignore +++ b/filamento/.gitignore @@ -1,2 +1,3 @@ filamento.db +files/ .sqlx/ diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml index 1c28c39..92c8370 100644 --- a/filamento/Cargo.toml +++ b/filamento/Cargo.toml @@ -8,7 +8,7 @@ futures = "0.3.31" lampada = { version = "0.1.0", path = "../lampada" } tokio = "1.42.0" thiserror = "2.0.11" -stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115"] } +stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115", "xep_0084"] } sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } # TODO: re-export jid? jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } @@ -19,6 +19,7 @@ sha2 = "0.10.8" sha3 = "0.10.8" base64 = "0.22.1" sha1 = "0.10.6" +image = "0.25.6" [dev-dependencies] tracing-subscriber = "0.3.19" diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index 74a9aa1..12ede0d 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -1,17 +1,47 @@ -use std::{path::Path, str::FromStr, time::Duration}; +use std::{path::Path, str::FromStr, sync::Arc, time::Duration}; -use filamento::{Client, db::Db}; +use filamento::{Client, db::Db, files::FileStore}; use jid::JID; +use tokio::io; use tracing::info; +#[derive(Clone)] +pub struct Files; + +impl FileStore for Files { + type Err = Arc<io::Error>; + + async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> { + Ok(tokio::fs::try_exists(format!("files/{}", name)) + .await + .map_err(|err| Arc::new(err))?) + } + + async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> { + Ok(tokio::fs::write(format!("files/{}", name), data) + .await + .map_err(|err| Arc::new(err))?) + } + + async fn delete(&self, name: &str) -> Result<(), Self::Err> { + Ok(tokio::fs::remove_file(format!("files/{}", name)) + .await + .map_err(|err| Arc::new(err))?) + } +} + #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); let db = Db::create_connect_and_migrate(Path::new("./filamento.db")) .await .unwrap(); - let (client, mut recv) = - Client::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db); + let (client, mut recv) = Client::new( + "test@blos.sm".try_into().unwrap(), + "slayed".to_string(), + db, + Files, + ); tokio::spawn(async move { while let Some(msg) = recv.recv().await { @@ -22,7 +52,10 @@ async fn main() { client.connect().await.unwrap(); tokio::time::sleep(Duration::from_secs(5)).await; info!("changing nick"); - client.change_nick("britney".to_string()).await.unwrap(); + // client + // .change_nick(Some("britney".to_string())) + // .await + // .unwrap(); info!("sending message"); client .send_message( @@ -34,7 +67,16 @@ async fn main() { .await .unwrap(); info!("sent message"); - tokio::time::sleep(Duration::from_secs(5)).await; + client + .send_message( + JID::from_str("cel@blos.sm").unwrap(), + filamento::chat::Body { + body: "hallo 2".to_string(), + }, + ) + .await + .unwrap(); + tokio::time::sleep(Duration::from_secs(15)).await; // info!("sending disco query"); // let info = client.disco_info(None, None).await.unwrap(); // info!("got disco result: {:#?}", info); diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql index 8c1b01c..c2f35dd 100644 --- a/filamento/migrations/20240113011930_luz.sql +++ b/filamento/migrations/20240113011930_luz.sql @@ -6,6 +6,7 @@ create table users( -- TODO: enforce bare jid jid text primary key not null, nick text, + avatar text, -- can receive presence status from non-contacts cached_status_message text -- TODO: last_seen diff --git a/filamento/src/avatar.rs b/filamento/src/avatar.rs new file mode 100644 index 0000000..a6937df --- /dev/null +++ b/filamento/src/avatar.rs @@ -0,0 +1,34 @@ +#[derive(Clone, Debug)] +pub struct Metadata { + pub bytes: u32, + pub hash: String, + pub r#type: String, +} + +#[derive(Clone, Debug)] +pub struct Data { + pub hash: String, + pub data_b64: String, +} + +#[derive(Clone, Debug)] +pub struct Avatar(Vec<u8>); + +impl From<stanza::xep_0084::Info> for Metadata { + fn from(value: stanza::xep_0084::Info) -> Self { + Self { + bytes: value.bytes, + hash: value.id, + r#type: value.r#type, + } + } +} + +impl From<stanza::xep_0084::Data> for Data { + fn from(value: stanza::xep_0084::Data) -> Self { + Self { + hash: todo!(), + data_b64: todo!(), + } + } +} diff --git a/filamento/src/caps.rs b/filamento/src/caps.rs index 49d05ba..b3464d1 100644 --- a/filamento/src/caps.rs +++ b/filamento/src/caps.rs @@ -40,6 +40,7 @@ pub fn client_info() -> Info { "http://jabber.org/protocol/caps".to_string(), "http://jabber.org/protocol/nick".to_string(), "http://jabber.org/protocol/nick+notify".to_string(), + "urn:xmpp:avatar:metadata+notify".to_string(), ], identities: vec![Identity { name: Some("filamento 0.1.0".to_string()), diff --git a/filamento/src/db.rs b/filamento/src/db.rs index c19f16c..cdc7520 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -75,16 +75,107 @@ impl Db { Ok(user) } - pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<(), Error> { - sqlx::query!( + /// returns whether or not the nickname was updated + pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { + if sqlx::query!( + "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?", + jid, + None::<String>, + None::<String>, + ) + .execute(&self.db) + .await? + .rows_affected() + > 0 + { + Ok(true) + } else { + Ok(false) + } + } + + /// returns whether or not the nickname was updated + pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { + if sqlx::query!( "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?", jid, nick, nick ) .execute(&self.db) - .await?; - Ok(()) + .await? + .rows_affected() + > 0 + { + Ok(true) + } else { + Ok(false) + } + } + + /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar + pub(crate) async fn delete_user_avatar( + &self, + jid: JID, + ) -> Result<(bool, Option<String>), Error> { + #[derive(sqlx::FromRow)] + struct AvatarRow { + avatar: Option<String>, + } + let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") + .bind(jid.clone()) + .fetch_optional(&self.db) + .await? + .map(|row: AvatarRow| row.avatar) + .unwrap_or(None); + if sqlx::query!( + "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ?", + jid, + None::<String>, + None::<String>, + ) + .execute(&self.db) + .await? + .rows_affected() + > 0 + { + Ok((true, old_avatar)) + } else { + Ok((false, old_avatar)) + } + } + + /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar + pub(crate) async fn upsert_user_avatar( + &self, + jid: JID, + avatar: String, + ) -> Result<(bool, Option<String>), Error> { + #[derive(sqlx::FromRow)] + struct AvatarRow { + avatar: Option<String>, + } + let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") + .bind(jid.clone()) + .fetch_optional(&self.db) + .await? + .map(|row: AvatarRow| row.avatar) + .unwrap_or(None); + if sqlx::query!( + "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ?", + jid, + avatar, + avatar + ) + .execute(&self.db) + .await? + .rows_affected() + > 0 + { + Ok((true, old_avatar)) + } else { + Ok((false, old_avatar)) + } } pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { @@ -441,12 +532,14 @@ impl Db { .execute(&self.db) .await?; let id = Uuid::new_v4(); - let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing returning *") + let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?") .bind(id) - .bind(bare_chat) + .bind(bare_chat.clone()) .bind(false) + .bind(bare_chat) .fetch_one(&self.db) .await?; + tracing::debug!("CHECKING chat: {:?}", chat); Ok(chat.have_chatted) } diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 5111413..23272b1 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -1,5 +1,7 @@ -use std::{string::FromUtf8Error, sync::Arc}; +use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc}; +use base64::DecodeError; +use image::ImageError; use jid::JID; use lampada::error::{ConnectionError, ReadError, WriteError}; use stanza::client::{Stanza, iq::Query}; @@ -7,9 +9,11 @@ use thiserror::Error; pub use lampada::error::CommandError; +use crate::files::FileStore; + // for the client logic impl #[derive(Debug, Error, Clone)] -pub enum Error { +pub enum Error<Fs: FileStore> { #[error("core error: {0}")] Connection(#[from] ConnectionError), #[error("received unrecognized/unsupported content")] @@ -33,11 +37,11 @@ pub enum Error { #[error("message send error: {0}")] MessageSend(#[from] MessageSendError), #[error("message receive error: {0}")] - MessageRecv(#[from] MessageRecvError), + MessageRecv(#[from] MessageRecvError<Fs>), #[error("subscripbe error: {0}")] Subscribe(#[from] SubscribeError), #[error("publish error: {0}")] - Publish(#[from] PublishError), + Publish(#[from] PEPError), } #[derive(Debug, Error, Clone)] @@ -53,13 +57,29 @@ pub enum MessageSendError { } #[derive(Debug, Error, Clone)] -pub enum MessageRecvError { +pub enum MessageRecvError<Fs: FileStore> { #[error("could not add to message history: {0}")] MessageHistory(#[from] DatabaseError), #[error("missing from")] MissingFrom, #[error("could not update user nick: {0}")] NickUpdate(DatabaseError), + #[error("could not update user avatar: {0}")] + AvatarUpdate(#[from] AvatarUpdateError<Fs>), +} + +#[derive(Debug, Error, Clone)] +pub enum AvatarUpdateError<Fs: FileStore> { + #[error("could not save to disk: {0}")] + FileStore(Fs::Err), + #[error("could not fetch avatar data: {0}")] + PEPError(#[from] CommandError<PEPError>), + #[error("base64 decode: {0}")] + Base64(#[from] DecodeError), + #[error("pep node missing avatar data")] + MissingData, + #[error("database: {0}")] + Database(#[from] DatabaseError), } #[derive(Debug, Error, Clone)] @@ -203,7 +223,7 @@ pub enum PresenceError { } #[derive(Debug, Error, Clone)] -pub enum PublishError { +pub enum PEPError { #[error("received mismatched query")] MismatchedQuery(Query), #[error("missing query")] @@ -216,12 +236,19 @@ pub enum PublishError { UnexpectedStanza(Stanza), #[error("iq response: {0}")] IqResponse(#[from] IqRequestError), + #[error("missing pep item")] + MissingItem, + #[error("incorrect item id: expected {0}, got {1}")] + IncorrectItemID(String, String), + #[error("unsupported pep item")] + UnsupportedItem, + // TODO: should the item be in the error? } #[derive(Debug, Error, Clone)] pub enum NickError { #[error("publishing nick: {0}")] - Publish(#[from] CommandError<PublishError>), + Publish(#[from] CommandError<PEPError>), #[error("updating database: {0}")] Database(#[from] DatabaseError), #[error("disconnected")] @@ -267,5 +294,31 @@ pub enum CapsNodeConversionError { #[error("missing hashtag")] MissingHashtag, } -// #[derive(Debug, Error, Clone)] -// pub enum CapsError {} + +#[derive(Debug, Error, Clone)] +pub enum AvatarPublishError<Fs: FileStore> { + #[error("disconnected")] + Disconnected, + #[error("image read: {0}")] + Read(Arc<std::io::Error>), + #[error("image: {0}")] + Image(Arc<ImageError>), + #[error("pep publish: {0}")] + Publish(#[from] CommandError<PEPError>), + #[error("bytes number conversion: {0}")] + FromInt(#[from] TryFromIntError), + #[error("could not save to disk")] + FileStore(Fs::Err), +} + +impl<Fs: FileStore> From<std::io::Error> for AvatarPublishError<Fs> { + fn from(value: std::io::Error) -> Self { + Self::Read(Arc::new(value)) + } +} + +impl<Fs: FileStore> From<ImageError> for AvatarPublishError<Fs> { + fn from(value: ImageError) -> Self { + Self::Image(Arc::new(value)) + } +} diff --git a/filamento/src/files.rs b/filamento/src/files.rs new file mode 100644 index 0000000..0dfe347 --- /dev/null +++ b/filamento/src/files.rs @@ -0,0 +1,19 @@ +use std::error::Error; + +pub trait FileStore { + type Err: Clone + Send + Error; + + fn is_stored( + &self, + name: &str, + ) -> impl std::future::Future<Output = Result<bool, Self::Err>> + std::marker::Send; + fn store( + &self, + name: &str, + data: &[u8], + ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send; + fn delete( + &self, + name: &str, + ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send; +} diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index c44edca..42646be 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -11,9 +11,10 @@ use chrono::Utc; use db::Db; use disco::{Info, Items}; use error::{ - ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, NickError, - PresenceError, PublishError, RosterError, StatusError, SubscribeError, + AvatarPublishError, ConnectionJobError, DatabaseError, DiscoError, Error, IqError, + MessageRecvError, NickError, PEPError, PresenceError, RosterError, StatusError, SubscribeError, }; +use files::FileStore; use futures::FutureExt; use jid::JID; use lampada::{ @@ -35,18 +36,20 @@ use tracing::{debug, info}; use user::User; use uuid::Uuid; +pub mod avatar; pub mod caps; pub mod chat; pub mod db; pub mod disco; pub mod error; +pub mod files; mod logic; pub mod pep; pub mod presence; pub mod roster; pub mod user; -pub enum Command { +pub enum Command<Fs: FileStore> { /// get the roster. if offline, retreive cached version from database. should be stored in application memory GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>), /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) @@ -115,16 +118,34 @@ pub enum Command { Option<String>, oneshot::Sender<Result<disco::Items, DiscoError>>, ), - /// publish item to a pep node, specified or default according to item. - Publish { + /// publish item to a pep node specified. + PublishPEPItem { item: pep::Item, node: String, - sender: oneshot::Sender<Result<(), PublishError>>, + sender: oneshot::Sender<Result<(), PEPError>>, }, - /// change user nickname - ChangeNick(String, oneshot::Sender<Result<(), NickError>>), + DeletePEPNode { + node: String, + sender: oneshot::Sender<Result<(), PEPError>>, + }, + GetPEPItem { + jid: Option<JID>, + node: String, + id: String, + sender: oneshot::Sender<Result<pep::Item, PEPError>>, + }, + /// change client user nickname + ChangeNick(Option<String>, oneshot::Sender<Result<(), NickError>>), + // // TODO + // GetNick(...), + // GetAvatar(...) // /// get capability node // GetCaps(String, oneshot::Sender<Result<Info, CapsError>>), + /// change client user avatar + ChangeAvatar( + Option<Vec<u8>>, + oneshot::Sender<Result<(), AvatarPublishError<Fs>>>, + ), } #[derive(Debug, Clone)] @@ -155,18 +176,22 @@ pub enum UpdateMessage { SubscriptionRequest(jid::JID), NickChanged { jid: JID, - nick: String, + nick: Option<String>, + }, + AvatarChanged { + jid: JID, + id: Option<String>, }, } /// an xmpp client that is suited for a chat client use case #[derive(Debug)] -pub struct Client { - sender: mpsc::Sender<CoreClientCommand<Command>>, +pub struct Client<Fs: FileStore> { + sender: mpsc::Sender<CoreClientCommand<Command<Fs>>>, timeout: Duration, } -impl Clone for Client { +impl<Fs: FileStore> Clone for Client<Fs> { fn clone(&self) -> Self { Self { sender: self.sender.clone(), @@ -175,32 +200,27 @@ impl Clone for Client { } } -impl Deref for Client { - type Target = mpsc::Sender<CoreClientCommand<Command>>; +impl<Fs: FileStore> Deref for Client<Fs> { + type Target = mpsc::Sender<CoreClientCommand<Command<Fs>>>; fn deref(&self) -> &Self::Target { &self.sender } } -impl DerefMut for Client { +impl<Fs: FileStore> DerefMut for Client<Fs> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.sender } } -impl Client { - pub async fn connect(&self) -> Result<(), ActorError> { - self.send(CoreClientCommand::Connect).await?; - Ok(()) - } - - pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { - self.send(CoreClientCommand::Disconnect).await?; - Ok(()) - } - - pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) { +impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> { + pub fn new( + jid: JID, + password: String, + db: Db, + file_store: Fs, + ) -> (Self, mpsc::Receiver<UpdateMessage>) { let (command_sender, command_receiver) = mpsc::channel(20); let (update_send, update_recv) = mpsc::channel(20); @@ -214,14 +234,26 @@ impl Client { timeout: Duration::from_secs(10), }; - let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send); + let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send, file_store); - let actor: CoreClient<ClientLogic> = + let actor: CoreClient<ClientLogic<Fs>> = CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); tokio::spawn(async move { actor.run().await }); (client, update_recv) } +} + +impl<Fs: FileStore> Client<Fs> { + pub async fn connect(&self) -> Result<(), ActorError> { + self.send(CoreClientCommand::Connect).await?; + Ok(()) + } + + pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { + self.send(CoreClientCommand::Disconnect).await?; + Ok(()) + } pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { let (send, recv) = oneshot::channel(); @@ -539,9 +571,9 @@ impl Client { &self, item: pep::Item, node: String, - ) -> Result<(), CommandError<PublishError>> { + ) -> Result<(), CommandError<PEPError>> { let (send, recv) = oneshot::channel(); - self.send(CoreClientCommand::Command(Command::Publish { + self.send(CoreClientCommand::Command(Command::PublishPEPItem { item, node, sender: send, @@ -555,7 +587,44 @@ impl Client { Ok(result) } - pub async fn change_nick(&self, nick: String) -> Result<(), CommandError<NickError>> { + pub async fn delete_pep_node(&self, node: String) -> Result<(), CommandError<PEPError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::DeletePEPNode { + node, + sender: send, + })) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn get_pep_item( + &self, + jid: Option<JID>, + node: String, + id: String, + ) -> Result<pep::Item, CommandError<PEPError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetPEPItem { + jid, + node, + id, + sender: send, + })) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn change_nick(&self, nick: Option<String>) -> Result<(), CommandError<NickError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::ChangeNick(nick, send))) .await @@ -566,10 +635,27 @@ impl Client { .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } + + pub async fn change_avatar( + &self, + avatar: Option<Vec<u8>>, + ) -> Result<(), CommandError<AvatarPublishError<Fs>>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::ChangeAvatar( + avatar, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } } -impl From<Command> for CoreClientCommand<Command> { - fn from(value: Command) -> Self { +impl<Fs: FileStore> From<Command<Fs>> for CoreClientCommand<Command<Fs>> { + fn from(value: Command<Fs>) -> Self { CoreClientCommand::Command(value) } } diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs index df82655..3588b13 100644 --- a/filamento/src/logic/abort.rs +++ b/filamento/src/logic/abort.rs @@ -1,7 +1,9 @@ use lampada::error::ReadError; +use crate::files::FileStore; + use super::ClientLogic; -pub async fn on_abort(logic: ClientLogic) { +pub async fn on_abort<Fs: FileStore + Clone>(logic: ClientLogic<Fs>) { logic.pending().drain().await; } diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs index dc05448..37cdad5 100644 --- a/filamento/src/logic/connect.rs +++ b/filamento/src/logic/connect.rs @@ -5,12 +5,16 @@ use tracing::debug; use crate::{ Command, UpdateMessage, error::{ConnectionJobError, Error, RosterError}, + files::FileStore, presence::{Online, PresenceType}, }; use super::ClientLogic; -pub async fn handle_connect(logic: ClientLogic, connection: Connected) { +pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>( + logic: ClientLogic<Fs>, + connection: Connected, +) { let (send, recv) = oneshot::channel(); debug!("getting roster"); logic diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs index 081900b..36c1cef 100644 --- a/filamento/src/logic/connection_error.rs +++ b/filamento/src/logic/connection_error.rs @@ -1,7 +1,12 @@ use lampada::error::ConnectionError; +use crate::files::FileStore; + use super::ClientLogic; -pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { +pub async fn handle_connection_error<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + error: ConnectionError, +) { logic.handle_error(error.into()).await; } diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs index 241c3e6..ebcfd4f 100644 --- a/filamento/src/logic/disconnect.rs +++ b/filamento/src/logic/disconnect.rs @@ -1,11 +1,14 @@ use lampada::Connected; use stanza::client::Stanza; -use crate::{UpdateMessage, presence::Offline}; +use crate::{UpdateMessage, files::FileStore, presence::Offline}; use super::ClientLogic; -pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) { +pub async fn handle_disconnect<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + connection: Connected, +) { // TODO: be able to set offline status message let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None); let stanza = Stanza::Presence(offline_presence); diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs index 3f6fe8d..cabbef4 100644 --- a/filamento/src/logic/local_only.rs +++ b/filamento/src/logic/local_only.rs @@ -4,44 +4,61 @@ use uuid::Uuid; use crate::{ chat::{Chat, Message}, error::DatabaseError, + files::FileStore, user::User, }; use super::ClientLogic; -pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> { Ok(logic.db().read_chats().await?) } -pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats_ordered<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> { Ok(logic.db().read_chats_ordered().await?) } -pub async fn handle_get_chats_ordered_with_latest_messages( - logic: &ClientLogic, +pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, ) -> Result<Vec<(Chat, Message)>, DatabaseError> { Ok(logic.db().read_chats_ordered_with_latest_messages().await?) } -pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> { +pub async fn handle_get_chat<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<Chat, DatabaseError> { Ok(logic.db().read_chat(jid).await?) } -pub async fn handle_get_messages( - logic: &ClientLogic, +pub async fn handle_get_messages<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, jid: JID, ) -> Result<Vec<Message>, DatabaseError> { Ok(logic.db().read_message_history(jid).await?) } -pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> { +pub async fn handle_delete_chat<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<(), DatabaseError> { Ok(logic.db().delete_chat(jid).await?) } -pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> { +pub async fn handle_delete_messaage<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + uuid: Uuid, +) -> Result<(), DatabaseError> { Ok(logic.db().delete_message(uuid).await?) } -pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> { +pub async fn handle_get_user<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<User, DatabaseError> { Ok(logic.db().read_user(jid).await?) } diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 1ddd7d3..5e05dac 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -4,12 +4,13 @@ use jid::JID; use lampada::{Connected, Logic, error::ReadError}; use stanza::client::Stanza; use tokio::sync::{Mutex, mpsc, oneshot}; -use tracing::{error, info, warn}; +use tracing::{error, info}; use crate::{ Client, Command, UpdateMessage, db::Db, error::{Error, IqRequestError, ResponseError}, + files::FileStore, }; mod abort; @@ -22,12 +23,13 @@ mod online; mod process_stanza; #[derive(Clone)] -pub struct ClientLogic { - client: Client, +pub struct ClientLogic<Fs: FileStore> { + client: Client<Fs>, bare_jid: JID, db: Db, pending: Pending, update_sender: mpsc::Sender<UpdateMessage>, + file_store: Fs, } #[derive(Clone)] @@ -75,12 +77,13 @@ impl Pending { } } -impl ClientLogic { +impl<Fs: FileStore> ClientLogic<Fs> { pub fn new( - client: Client, + client: Client<Fs>, bare_jid: JID, db: Db, update_sender: mpsc::Sender<UpdateMessage>, + file_store: Fs, ) -> Self { Self { db, @@ -88,10 +91,11 @@ impl ClientLogic { update_sender, client, bare_jid, + file_store, } } - pub fn client(&self) -> &Client { + pub fn client(&self) -> &Client<Fs> { &self.client } @@ -103,6 +107,10 @@ impl ClientLogic { &self.pending } + pub fn file_store(&self) -> &Fs { + &self.file_store + } + pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { &self.update_sender } @@ -113,13 +121,14 @@ impl ClientLogic { self.update_sender().send(update).await; } - pub async fn handle_error(&self, e: Error) { + // TODO: delete this + pub async fn handle_error(&self, e: Error<Fs>) { error!("{}", e); } } -impl Logic for ClientLogic { - type Cmd = Command; +impl<Fs: FileStore + Clone + Send + Sync> Logic for ClientLogic<Fs> { + type Cmd = Command<Fs>; // pub async fn handle_stream_error(self, error) {} // stanza errors (recoverable) diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index 6399cf7..566972c 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -1,3 +1,5 @@ +use std::process::id; + use chrono::Utc; use lampada::error::WriteError; use uuid::Uuid; @@ -6,9 +8,10 @@ use crate::{ Command, chat::{Delivery, Message}, error::{ - DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError, - StatusError, + AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, + NickError, PEPError, RosterError, StatusError, }, + files::FileStore, presence::Online, roster::Contact, }; @@ -22,7 +25,7 @@ use super::{ }, }; -pub async fn handle_offline(logic: ClientLogic, command: Command) { +pub async fn handle_offline<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>) { let result = handle_offline_result(&logic, command).await; match result { Ok(_) => {} @@ -30,16 +33,24 @@ pub async fn handle_offline(logic: ClientLogic, command: Command) { } } -pub async fn handle_set_status(logic: &ClientLogic, online: Online) -> Result<(), StatusError> { +pub async fn handle_set_status<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + online: Online, +) -> Result<(), StatusError> { logic.db().upsert_cached_status(online).await?; Ok(()) } -pub async fn handle_get_roster(logic: &ClientLogic) -> Result<Vec<Contact>, RosterError> { +pub async fn handle_get_roster<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Contact>, RosterError> { Ok(logic.db().read_cached_roster().await?) } -pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Result<(), Error> { +pub async fn handle_offline_result<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + command: Command<Fs>, +) -> Result<(), Error<Fs>> { match command { Command::GetRoster(sender) => { let roster = handle_get_roster(logic).await; @@ -77,7 +88,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res let user = handle_get_user(logic, jid).await; sender.send(user); } - // TODO: offline queue to modify roster Command::AddContact(_jid, sender) => { sender.send(Err(RosterError::Write(WriteError::Disconnected))); } @@ -112,7 +122,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res let result = handle_set_status(logic, online).await; sender.send(result); } - // TODO: offline message queue Command::SendMessage(jid, body) => { let id = Uuid::new_v4(); let timestamp = Utc::now(); @@ -159,7 +168,7 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res Command::DiscoItems(_jid, _node, sender) => { sender.send(Err(DiscoError::Write(WriteError::Disconnected))); } - Command::Publish { + Command::PublishPEPItem { item: _, node: _, sender, @@ -169,6 +178,24 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res Command::ChangeNick(_, sender) => { sender.send(Err(NickError::Disconnected)); } + Command::ChangeAvatar(_items, sender) => { + sender.send(Err(AvatarPublishError::Disconnected)); + } + Command::DeletePEPNode { node: _, sender } => { + sender.send(Err(PEPError::IqResponse(IqRequestError::Write( + WriteError::Disconnected, + )))); + } + Command::GetPEPItem { + node: _, + sender, + jid: _, + id: _, + } => { + sender.send(Err(PEPError::IqResponse(IqRequestError::Write( + WriteError::Disconnected, + )))); + } } Ok(()) } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index b069f59..745adc1 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,23 +1,24 @@ +use std::io::Cursor; + +use base64::{prelude::BASE64_STANDARD, Engine}; use chrono::Utc; +use image::ImageReader; use jid::JID; use lampada::{Connected, WriteMessage, error::WriteError}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ iq::{self, Iq, IqType, Query}, Stanza - }, - xep_0030::{info, items}, - xep_0060::pubsub::{self, Pubsub}, - xep_0172::{self, Nick}, - xep_0203::Delay, + }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay }; use tokio::sync::oneshot; use tracing::{debug, error, info}; use uuid::Uuid; use crate::{ - chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ - DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError - }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage + avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ + AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError + }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage }; use super::{ @@ -29,7 +30,7 @@ use super::{ }, }; -pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { +pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) { let result = handle_online_result(&logic, command, connection).await; match result { Ok(_) => {} @@ -37,8 +38,8 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con } } -pub async fn handle_get_roster( - logic: &ClientLogic, +pub async fn handle_get_roster<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, ) -> Result<Vec<Contact>, RosterError> { let iq_id = Uuid::new_v4().to_string(); @@ -96,8 +97,8 @@ pub async fn handle_get_roster( } } -pub async fn handle_add_contact( - logic: &ClientLogic, +pub async fn handle_add_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), RosterError> { @@ -154,8 +155,8 @@ pub async fn handle_add_contact( } } -pub async fn handle_buddy_request( - logic: &ClientLogic, +pub async fn handle_buddy_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -177,8 +178,8 @@ pub async fn handle_buddy_request( Ok(()) } -pub async fn handle_subscription_request( - logic: &ClientLogic, +pub async fn handle_subscription_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -195,8 +196,8 @@ pub async fn handle_subscription_request( Ok(()) } -pub async fn handle_accept_buddy_request( - logic: &ClientLogic, +pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -218,8 +219,8 @@ pub async fn handle_accept_buddy_request( Ok(()) } -pub async fn handle_accept_subscription_request( - logic: &ClientLogic, +pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -274,8 +275,8 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result< Ok(()) } -pub async fn handle_delete_contact( - logic: &ClientLogic, +pub async fn handle_delete_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), RosterError> { @@ -333,8 +334,8 @@ pub async fn handle_delete_contact( } } -pub async fn handle_update_contact( - logic: &ClientLogic, +pub async fn handle_update_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, contact_update: ContactUpdate, @@ -398,8 +399,8 @@ pub async fn handle_update_contact( } } -pub async fn handle_set_status( - logic: &ClientLogic, +pub async fn handle_set_status<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, online: Online, ) -> Result<(), StatusError> { @@ -411,9 +412,18 @@ pub async fn handle_set_status( Ok(()) } -pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { +pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) { // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. - let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + let have_chatted = match logic.db().upsert_chat_and_user(&jid).await { + Ok(have_chatted) => { + have_chatted + }, + Err(e) => { + error!("{}", e); + false + }, + }; let nick; let mark_chat_as_chatted; @@ -506,6 +516,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid }) .await; if mark_chat_as_chatted { + debug!("marking chat as chatted"); if let Err(e) = logic.db.mark_chat_as_chatted(jid).await { logic .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) @@ -540,8 +551,8 @@ pub async fn handle_send_presence( Ok(()) } -pub async fn handle_disco_info( - logic: &ClientLogic, +pub async fn handle_disco_info<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: Option<String>, @@ -611,8 +622,8 @@ pub async fn handle_disco_info( } } -pub async fn handle_disco_items( - logic: &ClientLogic, +pub async fn handle_disco_items<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: Option<String>, @@ -680,20 +691,60 @@ pub async fn handle_disco_items( } } -pub async fn handle_publish( - logic: &ClientLogic, +pub async fn handle_publish_pep_item<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, item: pep::Item, node: String, -) -> Result<(), PublishError> { +) -> Result<(), PEPError> { let id = Uuid::new_v4().to_string(); let publish = match item { - pep::Item::Nick(n) => pubsub::Publish { - node, - items: vec![pubsub::Item { - item: Some(pubsub::Content::Nick(Nick(n))), - ..Default::default() - }], + pep::Item::Nick(n) => { + if let Some(n) = n { + pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick(n))), + ..Default::default() + }], + } + } else { + pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick("".to_string()))), + ..Default::default() + }] + } + } + }, + pep::Item::AvatarMetadata(metadata) => { + if let Some(metadata) = metadata { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })), + id: Some(metadata.hash), + ..Default::default() + }]} + } else { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })), + ..Default::default() + }]} + } + }, + pep::Item::AvatarData(data) => { + if let Some(data) = data { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))), + id: Some(data.hash), + ..Default::default() + }] } + } else { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))), + ..Default::default() + }]} + } }, }; let request = Iq { @@ -726,38 +777,229 @@ pub async fn handle_publish( if let Some(query) = query { match query { Query::Pubsub(_) => Ok(()), - q => Err(PublishError::MismatchedQuery(q)), + q => Err(PEPError::MismatchedQuery(q)), } } else { - Err(PublishError::MissingQuery) + Err(PEPError::MissingQuery) } } IqType::Error => { - Err(PublishError::StanzaErrors(errors)) + Err(PEPError::StanzaErrors(errors)) } _ => unreachable!(), } } else { - Err(PublishError::IncorrectEntity( + Err(PEPError::IncorrectEntity( from.unwrap_or_else(|| connection.jid().as_bare()), )) } } - s => Err(PublishError::UnexpectedStanza(s)), + s => Err(PEPError::UnexpectedStanza(s)), } } -pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { +pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> { + let stanza_id = Uuid::new_v4().to_string(); + let request = Iq { + from: Some(connection.jid().clone()), + id: stanza_id.clone(), + to: jid.clone(), + r#type: IqType::Get, + lang: None, + query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items { + max_items: None, + node, + subid: None, + items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }], + }))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), stanza_id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == jid || { + if jid == None { + from == Some(connection.jid().as_bare()) + } else { + false + } + } { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::Pubsub(Pubsub::Items(mut items)) => { + if let Some(item) = items.items.pop() { + if item.id == Some(id.clone()) { + match item.item.ok_or(PEPError::MissingItem)? { + pubsub::Content::Nick(nick) => { + if nick.0.is_empty() { + Ok(pep::Item::Nick(None)) + } else { + Ok(pep::Item::Nick(Some(nick.0))) + + } + }, + pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))), + pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))), + pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem), + } + } else { + Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string()))) + } + } else { + Err(PEPError::MissingItem) + } + }, + q => Err(PEPError::MismatchedQuery(q)), + } + } else { + Err(PEPError::MissingQuery) + } + } + IqType::Error => { + Err(PEPError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + // TODO: include expected entity + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().as_bare()), + )) + } + } + s => Err(PEPError::UnexpectedStanza(s)), + } +} + +pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> { logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?; Ok(()) } +pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> { + match img_data { + // set avatar + Some(data) => { + // load the image data and guess the format + let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?; + + // convert the image to png; + let mut data_png = Vec::new(); + image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Png)?; + + // calculate the length of the data in bytes. + let bytes = data_png.len().try_into()?; + + // calculate sha1 hash of the data + let mut sha1 = Sha1::new(); + sha1.update(&data_png); + let sha1_result = sha1.finalize(); + let hash = BASE64_STANDARD.encode(sha1_result); + + // encode the image data as base64 + let data_b64 = BASE64_STANDARD.encode(data_png.clone()); + + // publish the data to the data node + logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?; + + // publish the metadata to the metadata node + logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/png".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?; + + // if everything went well, save the data to the disk. + + if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? { + logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))? + } + // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval. + // TODO: should the node be purged? + + Ok(()) + }, + // remove avatar + None => { + logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?; + logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?; + Ok(()) + }, + } +} + +pub async fn handle_delete_pep_node<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + connection: Connected, + node: String, +) -> Result<(), PEPError> { + let id = Uuid::new_v4().to_string(); + let request = Iq { + from: Some(connection.jid().clone()), + id: id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == None || + from == Some(connection.jid().as_bare()) + { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::PubsubOwner(_) => Ok(()), + q => Err(PEPError::MismatchedQuery(q)), + } + } else { + // Err(PEPError::MissingQuery) + Ok(()) + } + } + IqType::Error => { + Err(PEPError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().as_bare()), + )) + } + } + s => Err(PEPError::UnexpectedStanza(s)), + } +} + // TODO: could probably macro-ise? -pub async fn handle_online_result( - logic: &ClientLogic, - command: Command, +pub async fn handle_online_result<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + command: Command<Fs>, connection: Connected, -) -> Result<(), Error> { +) -> Result<(), Error<Fs>> { match command { Command::GetRoster(result_sender) => { let roster = handle_get_roster(logic, connection).await; @@ -854,14 +1096,26 @@ pub async fn handle_online_result( let result = handle_disco_items(logic, connection, jid, node).await; let _ = sender.send(result); } - Command::Publish { item, node, sender } => { - let result = handle_publish(logic, connection, item, node).await; + Command::PublishPEPItem { item, node, sender } => { + let result = handle_publish_pep_item(logic, connection, item, node).await; let _ = sender.send(result); } Command::ChangeNick(nick, sender) => { let result = handle_change_nick(logic, nick).await; let _ = sender.send(result); } + Command::ChangeAvatar(img_data, sender) => { + let result = handle_change_avatar(logic, img_data).await; + let _ = sender.send(result); + }, + Command::DeletePEPNode { node, sender } => { + let result = handle_delete_pep_node(logic, connection, node).await; + let _ = sender.send(result); + }, + Command::GetPEPItem { node, sender, jid, id } => { + let result = handle_get_pep_item(logic, connection, jid, node, id).await; + let _ = sender.send(result); + }, } Ok(()) } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 11d7588..c383d70 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -1,7 +1,9 @@ use std::str::FromStr; +use base64::{Engine, prelude::BASE64_STANDARD}; use chrono::Utc; use lampada::{Connected, SupervisorSender}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ Stanza, @@ -17,14 +19,22 @@ use uuid::Uuid; use crate::{ UpdateMessage, caps, chat::{Body, Message}, - error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError}, + error::{ + AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, + RosterError, + }, + files::FileStore, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, }; use super::ClientLogic; -pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +pub async fn handle_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + stanza: Stanza, + connection: Connected, +) { let result = process_stanza(logic.clone(), stanza, connection).await; match result { Ok(u) => match u { @@ -38,10 +48,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne } } -pub async fn recv_message( - logic: ClientLogic, +pub async fn recv_message<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza_message: stanza::client::message::Message, -) -> Result<Option<UpdateMessage>, MessageRecvError> { +) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> { if let Some(from) = stanza_message.from { // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. let timestamp = stanza_message @@ -50,6 +60,7 @@ pub async fn recv_message( .unwrap_or_else(|| Utc::now()); // TODO: group chat messages + // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state. // if there is a body, should create chat message if let Some(body) = stanza_message.body { let message = Message { @@ -67,16 +78,28 @@ pub async fn recv_message( }; // save the message to the database - logic.db().upsert_chat_and_user(&from).await?; - if let Err(e) = logic - .db() - .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) - .await; - } + match logic.db().upsert_chat_and_user(&from).await { + Ok(_) => { + if let Err(e) = logic + .db() + .create_message_with_user_resource( + message.clone(), + from.clone(), + from.clone(), + ) + .await + { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + } + }; // update the client with the new message logic @@ -89,70 +112,333 @@ pub async fn recv_message( } if let Some(nick) = stanza_message.nick { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) - .await; + let nick = nick.0; + if nick.is_empty() { + match logic.db().delete_user_nick(from.as_bare()).await { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick(from.as_bare(), nick.clone()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } } - - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0, - }) - .await; } if let Some(event) = stanza_message.event { match event { - Event::Items(items) => match items.node.as_str() { - "http://jabber.org/protocol/nick" => match items.items { - ItemsType::Item(items) => { - if let Some(item) = items.first() { - match &item.item { - Some(c) => match c { - Content::Nick(nick) => { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv( - MessageRecvError::NickUpdate(e), - )) - .await; + Event::Items(items) => { + match items.node.as_str() { + "http://jabber.org/protocol/nick" => match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + match &item.item { + Some(c) => match c { + Content::Nick(nick) => { + let nick = nick.0.clone(); + if nick.is_empty() { + match logic + .db() + .delete_user_nick(from.as_bare()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick( + from.as_bare(), + nick.clone(), + ) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + } } + _ => {} + }, + None => {} + } + } + } + _ => {} + }, + "urn:xmpp:avatar:metadata" => { + match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + match &item.item { + Some(Content::AvatarMetadata(metadata)) => { + // check if user avatar has been deleted + if let Some(metadata) = metadata + .info + .iter() + .find(|info| info.url.is_none()) + { + // check if user avatar has changed + match logic + .db() + .upsert_user_avatar( + from.as_bare(), + metadata.id.clone(), + ) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0.clone(), - }) - .await; + match logic + .file_store() + .is_stored(&metadata.id) + .await + .map_err(|err| { + AvatarUpdateError::<Fs>::FileStore( + err, + ) + }) { + Ok(false) => { + // get data + let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?; + match pep_item { + crate::pep::Item::AvatarData(data) => { + let data = data.map(|data| data.data_b64).unwrap_or_default(); + // TODO: these should all be in a separate avatarupdate function + match BASE64_STANDARD.decode(data) { + Ok(data) => { + let mut hasher = Sha1::new(); + hasher.update(&data); + let received_data_hash = BASE64_STANDARD.encode(hasher.finalize()); + if received_data_hash == metadata.id { + if let Err(e) = logic.file_store().store(&received_data_hash, &data).await { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await; + } + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + }, + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await; + }, + } + }, + _ => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await; + } + } + } + Ok(true) => { + // just send the update + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await; + } + } + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } else { + // delete avatar + match logic + .db() + .delete_user_avatar(from.as_bare()) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: None, + }, + ) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } + // check if the new file is in the file store + // if not, retrieve from server and save in the file store (remember to check if the hash matches) + // send the avatar update + } + _ => {} } - Content::Unknown(element) => {} - }, - None => {} + } } + _ => {} } } - ItemsType::Retract(retracts) => {} - }, - _ => {} - }, + _ => {} + } + } // Event::Collection(collection) => todo!(), // Event::Configuration(configuration) => todo!(), // Event::Delete(delete) => todo!(), // Event::Purge(purge) => todo!(), // Event::Subscription(subscription) => todo!(), - _ => {} + _ => {} // TODO: catch these catch-alls in some way } } @@ -240,8 +526,8 @@ pub async fn recv_presence( } } -pub async fn recv_iq( - logic: ClientLogic, +pub async fn recv_iq<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, connection: Connected, iq: Iq, ) -> Result<Option<UpdateMessage>, IqError> { @@ -469,11 +755,11 @@ pub async fn recv_iq( } } -pub async fn process_stanza( - logic: ClientLogic, +pub async fn process_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza: Stanza, connection: Connected, -) -> Result<Option<UpdateMessage>, Error> { +) -> Result<Option<UpdateMessage>, Error<Fs>> { let update = match stanza { Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), Stanza::Presence(presence) => Ok(recv_presence(presence).await?), diff --git a/filamento/src/pep.rs b/filamento/src/pep.rs index c71d843..3cd243f 100644 --- a/filamento/src/pep.rs +++ b/filamento/src/pep.rs @@ -1,17 +1,8 @@ -// in commandmessage -// pub struct Publish { -// item: Item, -// node: Option<String>, -// // no need for node, as item has the node -// } -// -// in updatemessage -// pub struct Event { -// from: JID, -// item: Item, -// } +use crate::avatar::{Data as AvatarData, Metadata as AvatarMetadata}; #[derive(Clone, Debug)] pub enum Item { - Nick(String), + Nick(Option<String>), + AvatarMetadata(Option<AvatarMetadata>), + AvatarData(Option<AvatarData>), } diff --git a/stanza/src/client/iq.rs b/stanza/src/client/iq.rs index 50884aa..a1d58f6 100644 --- a/stanza/src/client/iq.rs +++ b/stanza/src/client/iq.rs @@ -18,7 +18,10 @@ use crate::roster; use crate::xep_0030::{self, info, items}; #[cfg(feature = "xep_0060")] -use crate::xep_0060::pubsub::{self, Pubsub}; +use crate::xep_0060::{ + self, + pubsub::{self, Pubsub}, +}; #[cfg(feature = "xep_0199")] use crate::xep_0199::{self, Ping}; @@ -47,6 +50,8 @@ pub enum Query { DiscoItems(items::Query), #[cfg(feature = "xep_0060")] Pubsub(Pubsub), + #[cfg(feature = "xep_0060")] + PubsubOwner(xep_0060::owner::Pubsub), #[cfg(feature = "xep_0199")] Ping(Ping), #[cfg(feature = "rfc_6121")] @@ -74,6 +79,10 @@ impl FromElement for Query { } #[cfg(feature = "xep_0060")] (Some(pubsub::XMLNS), "pubsub") => Ok(Query::Pubsub(Pubsub::from_element(element)?)), + #[cfg(feature = "xep_0060")] + (Some(xep_0060::owner::XMLNS), "pubsub") => Ok(Query::PubsubOwner( + xep_0060::owner::Pubsub::from_element(element)?, + )), _ => Ok(Query::Unsupported), } } @@ -95,6 +104,8 @@ impl IntoElement for Query { Query::DiscoItems(query) => query.builder(), #[cfg(feature = "xep_0060")] Query::Pubsub(pubsub) => pubsub.builder(), + #[cfg(feature = "xep_0060")] + Query::PubsubOwner(pubsub) => pubsub.builder(), } } } diff --git a/stanza/src/xep_0060/owner.rs b/stanza/src/xep_0060/owner.rs index 1fedc60..7cf4355 100644 --- a/stanza/src/xep_0060/owner.rs +++ b/stanza/src/xep_0060/owner.rs @@ -198,8 +198,8 @@ impl IntoElement for Default { #[derive(Clone, Debug)] pub struct Delete { - node: String, - redirect: Option<Redirect>, + pub node: String, + pub redirect: Option<Redirect>, } impl FromElement for Delete { |