diff options
Diffstat (limited to '')
| -rw-r--r-- | filamento/src/avatar.rs | 34 | ||||
| -rw-r--r-- | filamento/src/caps.rs | 1 | ||||
| -rw-r--r-- | filamento/src/db.rs | 105 | ||||
| -rw-r--r-- | filamento/src/error.rs | 71 | ||||
| -rw-r--r-- | filamento/src/files.rs | 19 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 154 | ||||
| -rw-r--r-- | filamento/src/logic/abort.rs | 4 | ||||
| -rw-r--r-- | filamento/src/logic/connect.rs | 6 | ||||
| -rw-r--r-- | filamento/src/logic/connection_error.rs | 7 | ||||
| -rw-r--r-- | filamento/src/logic/disconnect.rs | 7 | ||||
| -rw-r--r-- | filamento/src/logic/local_only.rs | 37 | ||||
| -rw-r--r-- | filamento/src/logic/mod.rs | 27 | ||||
| -rw-r--r-- | filamento/src/logic/offline.rs | 45 | ||||
| -rw-r--r-- | filamento/src/logic/online.rs | 362 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 422 | ||||
| -rw-r--r-- | filamento/src/pep.rs | 17 | 
16 files changed, 1101 insertions, 217 deletions
| diff --git a/filamento/src/avatar.rs b/filamento/src/avatar.rs new file mode 100644 index 0000000..a6937df --- /dev/null +++ b/filamento/src/avatar.rs @@ -0,0 +1,34 @@ +#[derive(Clone, Debug)] +pub struct Metadata { +    pub bytes: u32, +    pub hash: String, +    pub r#type: String, +} + +#[derive(Clone, Debug)] +pub struct Data { +    pub hash: String, +    pub data_b64: String, +} + +#[derive(Clone, Debug)] +pub struct Avatar(Vec<u8>); + +impl From<stanza::xep_0084::Info> for Metadata { +    fn from(value: stanza::xep_0084::Info) -> Self { +        Self { +            bytes: value.bytes, +            hash: value.id, +            r#type: value.r#type, +        } +    } +} + +impl From<stanza::xep_0084::Data> for Data { +    fn from(value: stanza::xep_0084::Data) -> Self { +        Self { +            hash: todo!(), +            data_b64: todo!(), +        } +    } +} diff --git a/filamento/src/caps.rs b/filamento/src/caps.rs index 49d05ba..b3464d1 100644 --- a/filamento/src/caps.rs +++ b/filamento/src/caps.rs @@ -40,6 +40,7 @@ pub fn client_info() -> Info {              "http://jabber.org/protocol/caps".to_string(),              "http://jabber.org/protocol/nick".to_string(),              "http://jabber.org/protocol/nick+notify".to_string(), +            "urn:xmpp:avatar:metadata+notify".to_string(),          ],          identities: vec![Identity {              name: Some("filamento 0.1.0".to_string()), diff --git a/filamento/src/db.rs b/filamento/src/db.rs index c19f16c..cdc7520 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -75,16 +75,107 @@ impl Db {          Ok(user)      } -    pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<(), Error> { -        sqlx::query!( +    /// returns whether or not the nickname was updated +    pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { +        if sqlx::query!( +            "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?", +            jid, +            None::<String>, +            None::<String>, +        ) +        .execute(&self.db) +        .await? +        .rows_affected() +            > 0 +        { +            Ok(true) +        } else { +            Ok(false) +        } +    } + +    /// returns whether or not the nickname was updated +    pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { +        if sqlx::query!(              "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?",              jid,              nick,              nick          )          .execute(&self.db) -        .await?; -        Ok(()) +        .await? +        .rows_affected() +            > 0 +        { +            Ok(true) +        } else { +            Ok(false) +        } +    } + +    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar +    pub(crate) async fn delete_user_avatar( +        &self, +        jid: JID, +    ) -> Result<(bool, Option<String>), Error> { +        #[derive(sqlx::FromRow)] +        struct AvatarRow { +            avatar: Option<String>, +        } +        let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") +            .bind(jid.clone()) +            .fetch_optional(&self.db) +            .await? +            .map(|row: AvatarRow| row.avatar) +            .unwrap_or(None); +        if sqlx::query!( +            "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ?", +            jid, +            None::<String>, +            None::<String>, +        ) +        .execute(&self.db) +        .await? +        .rows_affected() +            > 0 +        { +            Ok((true, old_avatar)) +        } else { +            Ok((false, old_avatar)) +        } +    } + +    /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar +    pub(crate) async fn upsert_user_avatar( +        &self, +        jid: JID, +        avatar: String, +    ) -> Result<(bool, Option<String>), Error> { +        #[derive(sqlx::FromRow)] +        struct AvatarRow { +            avatar: Option<String>, +        } +        let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") +            .bind(jid.clone()) +            .fetch_optional(&self.db) +            .await? +            .map(|row: AvatarRow| row.avatar) +            .unwrap_or(None); +        if sqlx::query!( +            "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ?", +            jid, +            avatar, +            avatar +        ) +        .execute(&self.db) +        .await? +        .rows_affected() +            > 0 +        { +            Ok((true, old_avatar)) +        } else { +            Ok((false, old_avatar)) +        }      }      pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { @@ -441,12 +532,14 @@ impl Db {          .execute(&self.db)          .await?;          let id = Uuid::new_v4(); -        let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing returning *") +        let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?")              .bind(id) -            .bind(bare_chat) +            .bind(bare_chat.clone())              .bind(false) +            .bind(bare_chat)              .fetch_one(&self.db)              .await?; +        tracing::debug!("CHECKING chat: {:?}", chat);          Ok(chat.have_chatted)      } diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 5111413..23272b1 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -1,5 +1,7 @@ -use std::{string::FromUtf8Error, sync::Arc}; +use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc}; +use base64::DecodeError; +use image::ImageError;  use jid::JID;  use lampada::error::{ConnectionError, ReadError, WriteError};  use stanza::client::{Stanza, iq::Query}; @@ -7,9 +9,11 @@ use thiserror::Error;  pub use lampada::error::CommandError; +use crate::files::FileStore; +  // for the client logic impl  #[derive(Debug, Error, Clone)] -pub enum Error { +pub enum Error<Fs: FileStore> {      #[error("core error: {0}")]      Connection(#[from] ConnectionError),      #[error("received unrecognized/unsupported content")] @@ -33,11 +37,11 @@ pub enum Error {      #[error("message send error: {0}")]      MessageSend(#[from] MessageSendError),      #[error("message receive error: {0}")] -    MessageRecv(#[from] MessageRecvError), +    MessageRecv(#[from] MessageRecvError<Fs>),      #[error("subscripbe error: {0}")]      Subscribe(#[from] SubscribeError),      #[error("publish error: {0}")] -    Publish(#[from] PublishError), +    Publish(#[from] PEPError),  }  #[derive(Debug, Error, Clone)] @@ -53,13 +57,29 @@ pub enum MessageSendError {  }  #[derive(Debug, Error, Clone)] -pub enum MessageRecvError { +pub enum MessageRecvError<Fs: FileStore> {      #[error("could not add to message history: {0}")]      MessageHistory(#[from] DatabaseError),      #[error("missing from")]      MissingFrom,      #[error("could not update user nick: {0}")]      NickUpdate(DatabaseError), +    #[error("could not update user avatar: {0}")] +    AvatarUpdate(#[from] AvatarUpdateError<Fs>), +} + +#[derive(Debug, Error, Clone)] +pub enum AvatarUpdateError<Fs: FileStore> { +    #[error("could not save to disk: {0}")] +    FileStore(Fs::Err), +    #[error("could not fetch avatar data: {0}")] +    PEPError(#[from] CommandError<PEPError>), +    #[error("base64 decode: {0}")] +    Base64(#[from] DecodeError), +    #[error("pep node missing avatar data")] +    MissingData, +    #[error("database: {0}")] +    Database(#[from] DatabaseError),  }  #[derive(Debug, Error, Clone)] @@ -203,7 +223,7 @@ pub enum PresenceError {  }  #[derive(Debug, Error, Clone)] -pub enum PublishError { +pub enum PEPError {      #[error("received mismatched query")]      MismatchedQuery(Query),      #[error("missing query")] @@ -216,12 +236,19 @@ pub enum PublishError {      UnexpectedStanza(Stanza),      #[error("iq response: {0}")]      IqResponse(#[from] IqRequestError), +    #[error("missing pep item")] +    MissingItem, +    #[error("incorrect item id: expected {0}, got {1}")] +    IncorrectItemID(String, String), +    #[error("unsupported pep item")] +    UnsupportedItem, +    // TODO: should the item be in the error?  }  #[derive(Debug, Error, Clone)]  pub enum NickError {      #[error("publishing nick: {0}")] -    Publish(#[from] CommandError<PublishError>), +    Publish(#[from] CommandError<PEPError>),      #[error("updating database: {0}")]      Database(#[from] DatabaseError),      #[error("disconnected")] @@ -267,5 +294,31 @@ pub enum CapsNodeConversionError {      #[error("missing hashtag")]      MissingHashtag,  } -// #[derive(Debug, Error, Clone)] -// pub enum CapsError {} + +#[derive(Debug, Error, Clone)] +pub enum AvatarPublishError<Fs: FileStore> { +    #[error("disconnected")] +    Disconnected, +    #[error("image read: {0}")] +    Read(Arc<std::io::Error>), +    #[error("image: {0}")] +    Image(Arc<ImageError>), +    #[error("pep publish: {0}")] +    Publish(#[from] CommandError<PEPError>), +    #[error("bytes number conversion: {0}")] +    FromInt(#[from] TryFromIntError), +    #[error("could not save to disk")] +    FileStore(Fs::Err), +} + +impl<Fs: FileStore> From<std::io::Error> for AvatarPublishError<Fs> { +    fn from(value: std::io::Error) -> Self { +        Self::Read(Arc::new(value)) +    } +} + +impl<Fs: FileStore> From<ImageError> for AvatarPublishError<Fs> { +    fn from(value: ImageError) -> Self { +        Self::Image(Arc::new(value)) +    } +} diff --git a/filamento/src/files.rs b/filamento/src/files.rs new file mode 100644 index 0000000..0dfe347 --- /dev/null +++ b/filamento/src/files.rs @@ -0,0 +1,19 @@ +use std::error::Error; + +pub trait FileStore { +    type Err: Clone + Send + Error; + +    fn is_stored( +        &self, +        name: &str, +    ) -> impl std::future::Future<Output = Result<bool, Self::Err>> + std::marker::Send; +    fn store( +        &self, +        name: &str, +        data: &[u8], +    ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send; +    fn delete( +        &self, +        name: &str, +    ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send; +} diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index c44edca..42646be 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -11,9 +11,10 @@ use chrono::Utc;  use db::Db;  use disco::{Info, Items};  use error::{ -    ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, NickError, -    PresenceError, PublishError, RosterError, StatusError, SubscribeError, +    AvatarPublishError, ConnectionJobError, DatabaseError, DiscoError, Error, IqError, +    MessageRecvError, NickError, PEPError, PresenceError, RosterError, StatusError, SubscribeError,  }; +use files::FileStore;  use futures::FutureExt;  use jid::JID;  use lampada::{ @@ -35,18 +36,20 @@ use tracing::{debug, info};  use user::User;  use uuid::Uuid; +pub mod avatar;  pub mod caps;  pub mod chat;  pub mod db;  pub mod disco;  pub mod error; +pub mod files;  mod logic;  pub mod pep;  pub mod presence;  pub mod roster;  pub mod user; -pub enum Command { +pub enum Command<Fs: FileStore> {      /// get the roster. if offline, retreive cached version from database. should be stored in application memory      GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),      /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) @@ -115,16 +118,34 @@ pub enum Command {          Option<String>,          oneshot::Sender<Result<disco::Items, DiscoError>>,      ), -    /// publish item to a pep node, specified or default according to item. -    Publish { +    /// publish item to a pep node specified. +    PublishPEPItem {          item: pep::Item,          node: String, -        sender: oneshot::Sender<Result<(), PublishError>>, +        sender: oneshot::Sender<Result<(), PEPError>>,      }, -    /// change user nickname -    ChangeNick(String, oneshot::Sender<Result<(), NickError>>), +    DeletePEPNode { +        node: String, +        sender: oneshot::Sender<Result<(), PEPError>>, +    }, +    GetPEPItem { +        jid: Option<JID>, +        node: String, +        id: String, +        sender: oneshot::Sender<Result<pep::Item, PEPError>>, +    }, +    /// change client user nickname +    ChangeNick(Option<String>, oneshot::Sender<Result<(), NickError>>), +    // // TODO +    // GetNick(...), +    // GetAvatar(...)      // /// get capability node      // GetCaps(String, oneshot::Sender<Result<Info, CapsError>>), +    /// change client user avatar +    ChangeAvatar( +        Option<Vec<u8>>, +        oneshot::Sender<Result<(), AvatarPublishError<Fs>>>, +    ),  }  #[derive(Debug, Clone)] @@ -155,18 +176,22 @@ pub enum UpdateMessage {      SubscriptionRequest(jid::JID),      NickChanged {          jid: JID, -        nick: String, +        nick: Option<String>, +    }, +    AvatarChanged { +        jid: JID, +        id: Option<String>,      },  }  /// an xmpp client that is suited for a chat client use case  #[derive(Debug)] -pub struct Client { -    sender: mpsc::Sender<CoreClientCommand<Command>>, +pub struct Client<Fs: FileStore> { +    sender: mpsc::Sender<CoreClientCommand<Command<Fs>>>,      timeout: Duration,  } -impl Clone for Client { +impl<Fs: FileStore> Clone for Client<Fs> {      fn clone(&self) -> Self {          Self {              sender: self.sender.clone(), @@ -175,32 +200,27 @@ impl Clone for Client {      }  } -impl Deref for Client { -    type Target = mpsc::Sender<CoreClientCommand<Command>>; +impl<Fs: FileStore> Deref for Client<Fs> { +    type Target = mpsc::Sender<CoreClientCommand<Command<Fs>>>;      fn deref(&self) -> &Self::Target {          &self.sender      }  } -impl DerefMut for Client { +impl<Fs: FileStore> DerefMut for Client<Fs> {      fn deref_mut(&mut self) -> &mut Self::Target {          &mut self.sender      }  } -impl Client { -    pub async fn connect(&self) -> Result<(), ActorError> { -        self.send(CoreClientCommand::Connect).await?; -        Ok(()) -    } - -    pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { -        self.send(CoreClientCommand::Disconnect).await?; -        Ok(()) -    } - -    pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) { +impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> { +    pub fn new( +        jid: JID, +        password: String, +        db: Db, +        file_store: Fs, +    ) -> (Self, mpsc::Receiver<UpdateMessage>) {          let (command_sender, command_receiver) = mpsc::channel(20);          let (update_send, update_recv) = mpsc::channel(20); @@ -214,14 +234,26 @@ impl Client {              timeout: Duration::from_secs(10),          }; -        let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send); +        let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send, file_store); -        let actor: CoreClient<ClientLogic> = +        let actor: CoreClient<ClientLogic<Fs>> =              CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);          tokio::spawn(async move { actor.run().await });          (client, update_recv)      } +} + +impl<Fs: FileStore> Client<Fs> { +    pub async fn connect(&self) -> Result<(), ActorError> { +        self.send(CoreClientCommand::Connect).await?; +        Ok(()) +    } + +    pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { +        self.send(CoreClientCommand::Disconnect).await?; +        Ok(()) +    }      pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {          let (send, recv) = oneshot::channel(); @@ -539,9 +571,9 @@ impl Client {          &self,          item: pep::Item,          node: String, -    ) -> Result<(), CommandError<PublishError>> { +    ) -> Result<(), CommandError<PEPError>> {          let (send, recv) = oneshot::channel(); -        self.send(CoreClientCommand::Command(Command::Publish { +        self.send(CoreClientCommand::Command(Command::PublishPEPItem {              item,              node,              sender: send, @@ -555,7 +587,44 @@ impl Client {          Ok(result)      } -    pub async fn change_nick(&self, nick: String) -> Result<(), CommandError<NickError>> { +    pub async fn delete_pep_node(&self, node: String) -> Result<(), CommandError<PEPError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::DeletePEPNode { +            node, +            sender: send, +        })) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn get_pep_item( +        &self, +        jid: Option<JID>, +        node: String, +        id: String, +    ) -> Result<pep::Item, CommandError<PEPError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::GetPEPItem { +            jid, +            node, +            id, +            sender: send, +        })) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn change_nick(&self, nick: Option<String>) -> Result<(), CommandError<NickError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(Command::ChangeNick(nick, send)))              .await @@ -566,10 +635,27 @@ impl Client {              .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;          Ok(result)      } + +    pub async fn change_avatar( +        &self, +        avatar: Option<Vec<u8>>, +    ) -> Result<(), CommandError<AvatarPublishError<Fs>>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::ChangeAvatar( +            avatar, send, +        ))) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    }  } -impl From<Command> for CoreClientCommand<Command> { -    fn from(value: Command) -> Self { +impl<Fs: FileStore> From<Command<Fs>> for CoreClientCommand<Command<Fs>> { +    fn from(value: Command<Fs>) -> Self {          CoreClientCommand::Command(value)      }  } diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs index df82655..3588b13 100644 --- a/filamento/src/logic/abort.rs +++ b/filamento/src/logic/abort.rs @@ -1,7 +1,9 @@  use lampada::error::ReadError; +use crate::files::FileStore; +  use super::ClientLogic; -pub async fn on_abort(logic: ClientLogic) { +pub async fn on_abort<Fs: FileStore + Clone>(logic: ClientLogic<Fs>) {      logic.pending().drain().await;  } diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs index dc05448..37cdad5 100644 --- a/filamento/src/logic/connect.rs +++ b/filamento/src/logic/connect.rs @@ -5,12 +5,16 @@ use tracing::debug;  use crate::{      Command, UpdateMessage,      error::{ConnectionJobError, Error, RosterError}, +    files::FileStore,      presence::{Online, PresenceType},  };  use super::ClientLogic; -pub async fn handle_connect(logic: ClientLogic, connection: Connected) { +pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>( +    logic: ClientLogic<Fs>, +    connection: Connected, +) {      let (send, recv) = oneshot::channel();      debug!("getting roster");      logic diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs index 081900b..36c1cef 100644 --- a/filamento/src/logic/connection_error.rs +++ b/filamento/src/logic/connection_error.rs @@ -1,7 +1,12 @@  use lampada::error::ConnectionError; +use crate::files::FileStore; +  use super::ClientLogic; -pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { +pub async fn handle_connection_error<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>, +    error: ConnectionError, +) {      logic.handle_error(error.into()).await;  } diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs index 241c3e6..ebcfd4f 100644 --- a/filamento/src/logic/disconnect.rs +++ b/filamento/src/logic/disconnect.rs @@ -1,11 +1,14 @@  use lampada::Connected;  use stanza::client::Stanza; -use crate::{UpdateMessage, presence::Offline}; +use crate::{UpdateMessage, files::FileStore, presence::Offline};  use super::ClientLogic; -pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) { +pub async fn handle_disconnect<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>, +    connection: Connected, +) {      // TODO: be able to set offline status message      let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None);      let stanza = Stanza::Presence(offline_presence); diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs index 3f6fe8d..cabbef4 100644 --- a/filamento/src/logic/local_only.rs +++ b/filamento/src/logic/local_only.rs @@ -4,44 +4,61 @@ use uuid::Uuid;  use crate::{      chat::{Chat, Message},      error::DatabaseError, +    files::FileStore,      user::User,  };  use super::ClientLogic; -pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> {      Ok(logic.db().read_chats().await?)  } -pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats_ordered<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> {      Ok(logic.db().read_chats_ordered().await?)  } -pub async fn handle_get_chats_ordered_with_latest_messages( -    logic: &ClientLogic, +pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,  ) -> Result<Vec<(Chat, Message)>, DatabaseError> {      Ok(logic.db().read_chats_ordered_with_latest_messages().await?)  } -pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> { +pub async fn handle_get_chat<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    jid: JID, +) -> Result<Chat, DatabaseError> {      Ok(logic.db().read_chat(jid).await?)  } -pub async fn handle_get_messages( -    logic: &ClientLogic, +pub async fn handle_get_messages<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      jid: JID,  ) -> Result<Vec<Message>, DatabaseError> {      Ok(logic.db().read_message_history(jid).await?)  } -pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> { +pub async fn handle_delete_chat<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    jid: JID, +) -> Result<(), DatabaseError> {      Ok(logic.db().delete_chat(jid).await?)  } -pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> { +pub async fn handle_delete_messaage<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    uuid: Uuid, +) -> Result<(), DatabaseError> {      Ok(logic.db().delete_message(uuid).await?)  } -pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> { +pub async fn handle_get_user<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    jid: JID, +) -> Result<User, DatabaseError> {      Ok(logic.db().read_user(jid).await?)  } diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 1ddd7d3..5e05dac 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -4,12 +4,13 @@ use jid::JID;  use lampada::{Connected, Logic, error::ReadError};  use stanza::client::Stanza;  use tokio::sync::{Mutex, mpsc, oneshot}; -use tracing::{error, info, warn}; +use tracing::{error, info};  use crate::{      Client, Command, UpdateMessage,      db::Db,      error::{Error, IqRequestError, ResponseError}, +    files::FileStore,  };  mod abort; @@ -22,12 +23,13 @@ mod online;  mod process_stanza;  #[derive(Clone)] -pub struct ClientLogic { -    client: Client, +pub struct ClientLogic<Fs: FileStore> { +    client: Client<Fs>,      bare_jid: JID,      db: Db,      pending: Pending,      update_sender: mpsc::Sender<UpdateMessage>, +    file_store: Fs,  }  #[derive(Clone)] @@ -75,12 +77,13 @@ impl Pending {      }  } -impl ClientLogic { +impl<Fs: FileStore> ClientLogic<Fs> {      pub fn new( -        client: Client, +        client: Client<Fs>,          bare_jid: JID,          db: Db,          update_sender: mpsc::Sender<UpdateMessage>, +        file_store: Fs,      ) -> Self {          Self {              db, @@ -88,10 +91,11 @@ impl ClientLogic {              update_sender,              client,              bare_jid, +            file_store,          }      } -    pub fn client(&self) -> &Client { +    pub fn client(&self) -> &Client<Fs> {          &self.client      } @@ -103,6 +107,10 @@ impl ClientLogic {          &self.pending      } +    pub fn file_store(&self) -> &Fs { +        &self.file_store +    } +      pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {          &self.update_sender      } @@ -113,13 +121,14 @@ impl ClientLogic {          self.update_sender().send(update).await;      } -    pub async fn handle_error(&self, e: Error) { +    // TODO: delete this +    pub async fn handle_error(&self, e: Error<Fs>) {          error!("{}", e);      }  } -impl Logic for ClientLogic { -    type Cmd = Command; +impl<Fs: FileStore + Clone + Send + Sync> Logic for ClientLogic<Fs> { +    type Cmd = Command<Fs>;      // pub async fn handle_stream_error(self, error) {}      // stanza errors (recoverable) diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index 6399cf7..566972c 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -1,3 +1,5 @@ +use std::process::id; +  use chrono::Utc;  use lampada::error::WriteError;  use uuid::Uuid; @@ -6,9 +8,10 @@ use crate::{      Command,      chat::{Delivery, Message},      error::{ -        DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError, -        StatusError, +        AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, +        NickError, PEPError, RosterError, StatusError,      }, +    files::FileStore,      presence::Online,      roster::Contact,  }; @@ -22,7 +25,7 @@ use super::{      },  }; -pub async fn handle_offline(logic: ClientLogic, command: Command) { +pub async fn handle_offline<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>) {      let result = handle_offline_result(&logic, command).await;      match result {          Ok(_) => {} @@ -30,16 +33,24 @@ pub async fn handle_offline(logic: ClientLogic, command: Command) {      }  } -pub async fn handle_set_status(logic: &ClientLogic, online: Online) -> Result<(), StatusError> { +pub async fn handle_set_status<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    online: Online, +) -> Result<(), StatusError> {      logic.db().upsert_cached_status(online).await?;      Ok(())  } -pub async fn handle_get_roster(logic: &ClientLogic) -> Result<Vec<Contact>, RosterError> { +pub async fn handle_get_roster<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +) -> Result<Vec<Contact>, RosterError> {      Ok(logic.db().read_cached_roster().await?)  } -pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Result<(), Error> { +pub async fn handle_offline_result<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    command: Command<Fs>, +) -> Result<(), Error<Fs>> {      match command {          Command::GetRoster(sender) => {              let roster = handle_get_roster(logic).await; @@ -77,7 +88,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res              let user = handle_get_user(logic, jid).await;              sender.send(user);          } -        // TODO: offline queue to modify roster          Command::AddContact(_jid, sender) => {              sender.send(Err(RosterError::Write(WriteError::Disconnected)));          } @@ -112,7 +122,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res              let result = handle_set_status(logic, online).await;              sender.send(result);          } -        // TODO: offline message queue          Command::SendMessage(jid, body) => {              let id = Uuid::new_v4();              let timestamp = Utc::now(); @@ -159,7 +168,7 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res          Command::DiscoItems(_jid, _node, sender) => {              sender.send(Err(DiscoError::Write(WriteError::Disconnected)));          } -        Command::Publish { +        Command::PublishPEPItem {              item: _,              node: _,              sender, @@ -169,6 +178,24 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res          Command::ChangeNick(_, sender) => {              sender.send(Err(NickError::Disconnected));          } +        Command::ChangeAvatar(_items, sender) => { +            sender.send(Err(AvatarPublishError::Disconnected)); +        } +        Command::DeletePEPNode { node: _, sender } => { +            sender.send(Err(PEPError::IqResponse(IqRequestError::Write( +                WriteError::Disconnected, +            )))); +        } +        Command::GetPEPItem { +            node: _, +            sender, +            jid: _, +            id: _, +        } => { +            sender.send(Err(PEPError::IqResponse(IqRequestError::Write( +                WriteError::Disconnected, +            )))); +        }      }      Ok(())  } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index b069f59..745adc1 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,23 +1,24 @@ +use std::io::Cursor; + +use base64::{prelude::BASE64_STANDARD, Engine};  use chrono::Utc; +use image::ImageReader;  use jid::JID;  use lampada::{Connected, WriteMessage, error::WriteError}; +use sha1::{Digest, Sha1};  use stanza::{      client::{          iq::{self, Iq, IqType, Query}, Stanza -    }, -    xep_0030::{info, items}, -    xep_0060::pubsub::{self, Pubsub}, -    xep_0172::{self, Nick}, -    xep_0203::Delay, +    }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay  };  use tokio::sync::oneshot;  use tracing::{debug, error, info};  use uuid::Uuid;  use crate::{ -    chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ -        DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError -    }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage +    avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ +        AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError +    }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage  };  use super::{ @@ -29,7 +30,7 @@ use super::{      },  }; -pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { +pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {      let result = handle_online_result(&logic, command, connection).await;      match result {          Ok(_) => {} @@ -37,8 +38,8 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con      }  } -pub async fn handle_get_roster( -    logic: &ClientLogic, +pub async fn handle_get_roster<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,  ) -> Result<Vec<Contact>, RosterError> {      let iq_id = Uuid::new_v4().to_string(); @@ -96,8 +97,8 @@ pub async fn handle_get_roster(      }  } -pub async fn handle_add_contact( -    logic: &ClientLogic, +pub async fn handle_add_contact<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), RosterError> { @@ -154,8 +155,8 @@ pub async fn handle_add_contact(      }  } -pub async fn handle_buddy_request( -    logic: &ClientLogic, +pub async fn handle_buddy_request<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), SubscribeError> { @@ -177,8 +178,8 @@ pub async fn handle_buddy_request(      Ok(())  } -pub async fn handle_subscription_request( -    logic: &ClientLogic, +pub async fn handle_subscription_request<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), SubscribeError> { @@ -195,8 +196,8 @@ pub async fn handle_subscription_request(      Ok(())  } -pub async fn handle_accept_buddy_request( -    logic: &ClientLogic, +pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), SubscribeError> { @@ -218,8 +219,8 @@ pub async fn handle_accept_buddy_request(      Ok(())  } -pub async fn handle_accept_subscription_request( -    logic: &ClientLogic, +pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), SubscribeError> { @@ -274,8 +275,8 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<      Ok(())  } -pub async fn handle_delete_contact( -    logic: &ClientLogic, +pub async fn handle_delete_contact<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), RosterError> { @@ -333,8 +334,8 @@ pub async fn handle_delete_contact(      }  } -pub async fn handle_update_contact( -    logic: &ClientLogic, +pub async fn handle_update_contact<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,      contact_update: ContactUpdate, @@ -398,8 +399,8 @@ pub async fn handle_update_contact(      }  } -pub async fn handle_set_status( -    logic: &ClientLogic, +pub async fn handle_set_status<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      online: Online,  ) -> Result<(), StatusError> { @@ -411,9 +412,18 @@ pub async fn handle_set_status(      Ok(())  } -pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { +pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) {      // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. -    let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); +    // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); +    let have_chatted  = match logic.db().upsert_chat_and_user(&jid).await { +        Ok(have_chatted) => { +            have_chatted +        }, +        Err(e) => { +            error!("{}", e); +            false +        }, +    };      let nick;      let mark_chat_as_chatted; @@ -506,6 +516,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid                  })                  .await;              if mark_chat_as_chatted { +                debug!("marking chat as chatted");                  if let Err(e) = logic.db.mark_chat_as_chatted(jid).await {                      logic                          .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) @@ -540,8 +551,8 @@ pub async fn handle_send_presence(      Ok(())  } -pub async fn handle_disco_info( -    logic: &ClientLogic, +pub async fn handle_disco_info<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: Option<JID>,      node: Option<String>, @@ -611,8 +622,8 @@ pub async fn handle_disco_info(      }  } -pub async fn handle_disco_items( -    logic: &ClientLogic, +pub async fn handle_disco_items<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: Option<JID>,      node: Option<String>, @@ -680,20 +691,60 @@ pub async fn handle_disco_items(      }  } -pub async fn handle_publish( -    logic: &ClientLogic, +pub async fn handle_publish_pep_item<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      item: pep::Item,      node: String, -) -> Result<(), PublishError> { +) -> Result<(), PEPError> {      let id = Uuid::new_v4().to_string();      let publish = match item { -        pep::Item::Nick(n) => pubsub::Publish { -            node, -            items: vec![pubsub::Item { -                item: Some(pubsub::Content::Nick(Nick(n))), -                ..Default::default() -            }], +        pep::Item::Nick(n) => { +            if let Some(n) = n { +                pubsub::Publish { +                    node, +                    items: vec![pubsub::Item { +                        item: Some(pubsub::Content::Nick(Nick(n))), +                        ..Default::default() +                    }], +                } +            } else { +                pubsub::Publish { +                    node, +                    items: vec![pubsub::Item { +                        item: Some(pubsub::Content::Nick(Nick("".to_string()))), +                        ..Default::default() +                    }] +                } +            } +        }, +        pep::Item::AvatarMetadata(metadata) => { +            if let Some(metadata) = metadata { +                pubsub::Publish { node, items: vec![pubsub::Item { +                    item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })), +                    id: Some(metadata.hash), +                    ..Default::default() +                }]} +            } else { +                pubsub::Publish { node, items: vec![pubsub::Item { +                    item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })), +                    ..Default::default() +                }]} +            } +        }, +        pep::Item::AvatarData(data) => { +            if let Some(data) = data { +                pubsub::Publish { node, items: vec![pubsub::Item { +                    item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))), +                    id: Some(data.hash), +                    ..Default::default() +                }] } +            } else { +                pubsub::Publish { node, items: vec![pubsub::Item { +                    item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))), +                    ..Default::default() +                }]} +            }          },      };      let request = Iq { @@ -726,38 +777,229 @@ pub async fn handle_publish(                          if let Some(query) = query {                              match query {                                  Query::Pubsub(_) => Ok(()), -                                q => Err(PublishError::MismatchedQuery(q)), +                                q => Err(PEPError::MismatchedQuery(q)),                              }                          } else { -                            Err(PublishError::MissingQuery) +                            Err(PEPError::MissingQuery)                          }                      }                      IqType::Error => { -                        Err(PublishError::StanzaErrors(errors)) +                        Err(PEPError::StanzaErrors(errors))                      }                      _ => unreachable!(),                  }              } else { -                Err(PublishError::IncorrectEntity( +                Err(PEPError::IncorrectEntity(                      from.unwrap_or_else(|| connection.jid().as_bare()),                  ))              }          } -        s => Err(PublishError::UnexpectedStanza(s)), +        s => Err(PEPError::UnexpectedStanza(s)),      }  } -pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { +pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> { +    let stanza_id = Uuid::new_v4().to_string(); +    let request = Iq { +        from: Some(connection.jid().clone()), +        id: stanza_id.clone(), +        to: jid.clone(), +        r#type: IqType::Get, +        lang: None, +        query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items { +            max_items: None, +            node, +            subid: None, +            items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }], +        }))), +        errors: Vec::new(), +    }; +    match logic +        .pending() +        .request(&connection, Stanza::Iq(request), stanza_id) +        .await? { +         +        Stanza::Iq(Iq { +            from, +            r#type, +            query, +            errors, +            .. +            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. +        }) if r#type == IqType::Result || r#type == IqType::Error => { +            if from == jid || { +                if jid == None { +                    from == Some(connection.jid().as_bare()) +                } else { +                    false +                } +            } { +                match r#type { +                    IqType::Result => { +                        if let Some(query) = query { +                            match query { +                                Query::Pubsub(Pubsub::Items(mut items)) => { +                                    if let Some(item) = items.items.pop() { +                                        if item.id == Some(id.clone()) { +                                            match item.item.ok_or(PEPError::MissingItem)? { +                                                pubsub::Content::Nick(nick) => { +                                                    if nick.0.is_empty() { +                                                        Ok(pep::Item::Nick(None)) +                                                    } else { +                                                        Ok(pep::Item::Nick(Some(nick.0))) +                                                         +                                                    } +                                                }, +                                                pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))), +                                                pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))), +                                                pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem), +                                            } +                                        } else { +                                            Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string()))) +                                        } +                                    } else { +                                        Err(PEPError::MissingItem) +                                    } +                                }, +                                q => Err(PEPError::MismatchedQuery(q)), +                            } +                        } else { +                            Err(PEPError::MissingQuery) +                        } +                    } +                    IqType::Error => { +                        Err(PEPError::StanzaErrors(errors)) +                    } +                    _ => unreachable!(), +                } +            } else { +                // TODO: include expected entity +                Err(PEPError::IncorrectEntity( +                    from.unwrap_or_else(|| connection.jid().as_bare()), +                )) +            } +        } +        s => Err(PEPError::UnexpectedStanza(s)), +    } +} + +pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> {      logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?;      Ok(())  } +pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> { +    match img_data { +        // set avatar +        Some(data) => { +            // load the image data and guess the format +            let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?; + +            // convert the image to png; +            let mut data_png = Vec::new(); +            image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Png)?; + +            // calculate the length of the data in bytes. +            let bytes = data_png.len().try_into()?; + +            // calculate sha1 hash of the data +            let mut sha1 = Sha1::new(); +            sha1.update(&data_png); +            let sha1_result = sha1.finalize(); +            let hash = BASE64_STANDARD.encode(sha1_result); + +            // encode the image data as base64 +            let data_b64 = BASE64_STANDARD.encode(data_png.clone()); + +            // publish the data to the data node +            logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?; + +            // publish the metadata to the metadata node +            logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/png".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?; + +            // if everything went well, save the data to the disk. + +            if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? { +                logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))? +            } +            // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval. +            // TODO: should the node be purged? + +            Ok(()) +        }, +        // remove avatar +        None => { +            logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?; +            logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?; +            Ok(()) +        }, +    } +} + +pub async fn handle_delete_pep_node<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    connection: Connected, +    node: String, +) -> Result<(), PEPError> { +    let id = Uuid::new_v4().to_string(); +    let request = Iq { +        from: Some(connection.jid().clone()), +        id: id.clone(), +        to: None, +        r#type: IqType::Set, +        lang: None, +        query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))), +        errors: Vec::new(), +    }; +    match logic +        .pending() +        .request(&connection, Stanza::Iq(request), id) +        .await? { +         +        Stanza::Iq(Iq { +            from, +            r#type, +            query, +            errors, +            .. +            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. +        }) if r#type == IqType::Result || r#type == IqType::Error => { +            if from == None ||  +                    from == Some(connection.jid().as_bare()) +             { +                match r#type { +                    IqType::Result => { +                        if let Some(query) = query { +                            match query { +                                Query::PubsubOwner(_) => Ok(()), +                                q => Err(PEPError::MismatchedQuery(q)), +                            } +                        } else { +                            // Err(PEPError::MissingQuery) +                            Ok(()) +                        } +                    } +                    IqType::Error => { +                        Err(PEPError::StanzaErrors(errors)) +                    } +                    _ => unreachable!(), +                } +            } else { +                Err(PEPError::IncorrectEntity( +                    from.unwrap_or_else(|| connection.jid().as_bare()), +                )) +            } +        } +        s => Err(PEPError::UnexpectedStanza(s)), +    } +} +  // TODO: could probably macro-ise? -pub async fn handle_online_result( -    logic: &ClientLogic, -    command: Command, +pub async fn handle_online_result<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    command: Command<Fs>,      connection: Connected, -) -> Result<(), Error> { +) -> Result<(), Error<Fs>> {      match command {          Command::GetRoster(result_sender) => {              let roster = handle_get_roster(logic, connection).await; @@ -854,14 +1096,26 @@ pub async fn handle_online_result(              let result = handle_disco_items(logic, connection, jid, node).await;              let _ = sender.send(result);          } -        Command::Publish { item, node, sender } => { -            let result = handle_publish(logic, connection, item, node).await; +        Command::PublishPEPItem { item, node, sender } => { +            let result = handle_publish_pep_item(logic, connection, item, node).await;              let _ = sender.send(result);          }          Command::ChangeNick(nick, sender) => {              let result = handle_change_nick(logic, nick).await;              let _ = sender.send(result);          } +        Command::ChangeAvatar(img_data, sender) => { +            let result = handle_change_avatar(logic, img_data).await; +            let _ = sender.send(result); +        }, +        Command::DeletePEPNode { node, sender } => { +            let result = handle_delete_pep_node(logic, connection, node).await; +            let _ = sender.send(result); +        }, +        Command::GetPEPItem { node, sender, jid, id } => { +            let result = handle_get_pep_item(logic, connection, jid, node, id).await; +            let _ = sender.send(result); +        },      }      Ok(())  } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 11d7588..c383d70 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -1,7 +1,9 @@  use std::str::FromStr; +use base64::{Engine, prelude::BASE64_STANDARD};  use chrono::Utc;  use lampada::{Connected, SupervisorSender}; +use sha1::{Digest, Sha1};  use stanza::{      client::{          Stanza, @@ -17,14 +19,22 @@ use uuid::Uuid;  use crate::{      UpdateMessage, caps,      chat::{Body, Message}, -    error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError}, +    error::{ +        AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, +        RosterError, +    }, +    files::FileStore,      presence::{Offline, Online, Presence, PresenceType, Show},      roster::Contact,  };  use super::ClientLogic; -pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +pub async fn handle_stanza<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>, +    stanza: Stanza, +    connection: Connected, +) {      let result = process_stanza(logic.clone(), stanza, connection).await;      match result {          Ok(u) => match u { @@ -38,10 +48,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne      }  } -pub async fn recv_message( -    logic: ClientLogic, +pub async fn recv_message<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>,      stanza_message: stanza::client::message::Message, -) -> Result<Option<UpdateMessage>, MessageRecvError> { +) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> {      if let Some(from) = stanza_message.from {          // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.          let timestamp = stanza_message @@ -50,6 +60,7 @@ pub async fn recv_message(              .unwrap_or_else(|| Utc::now());          // TODO: group chat messages +        // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state.          // if there is a body, should create chat message          if let Some(body) = stanza_message.body {              let message = Message { @@ -67,16 +78,28 @@ pub async fn recv_message(              };              // save the message to the database -            logic.db().upsert_chat_and_user(&from).await?; -            if let Err(e) = logic -                .db() -                .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) -                .await -            { -                logic -                    .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) -                    .await; -            } +            match logic.db().upsert_chat_and_user(&from).await { +                Ok(_) => { +                    if let Err(e) = logic +                        .db() +                        .create_message_with_user_resource( +                            message.clone(), +                            from.clone(), +                            from.clone(), +                        ) +                        .await +                    { +                        logic +                            .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) +                            .await; +                    } +                } +                Err(e) => { +                    logic +                        .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) +                        .await; +                } +            };              // update the client with the new message              logic @@ -89,70 +112,333 @@ pub async fn recv_message(          }          if let Some(nick) = stanza_message.nick { -            if let Err(e) = logic -                .db() -                .upsert_user_nick(from.as_bare(), nick.0.clone()) -                .await -            { -                logic -                    .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) -                    .await; +            let nick = nick.0; +            if nick.is_empty() { +                match logic.db().delete_user_nick(from.as_bare()).await { +                    Ok(changed) => { +                        if changed { +                            logic +                                .update_sender() +                                .send(UpdateMessage::NickChanged { +                                    jid: from.as_bare(), +                                    nick: None, +                                }) +                                .await; +                        } +                    } +                    Err(e) => { +                        logic +                            .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) +                            .await; +                        // if failed, send user update anyway +                        logic +                            .update_sender() +                            .send(UpdateMessage::NickChanged { +                                jid: from.as_bare(), +                                nick: None, +                            }) +                            .await; +                    } +                } +            } else { +                match logic +                    .db() +                    .upsert_user_nick(from.as_bare(), nick.clone()) +                    .await +                { +                    Ok(changed) => { +                        if changed { +                            logic +                                .update_sender() +                                .send(UpdateMessage::NickChanged { +                                    jid: from.as_bare(), +                                    nick: Some(nick), +                                }) +                                .await; +                        } +                    } +                    Err(e) => { +                        logic +                            .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) +                            .await; +                        // if failed, send user update anyway +                        logic +                            .update_sender() +                            .send(UpdateMessage::NickChanged { +                                jid: from.as_bare(), +                                nick: Some(nick), +                            }) +                            .await; +                    } +                }              } - -            logic -                .update_sender() -                .send(UpdateMessage::NickChanged { -                    jid: from.as_bare(), -                    nick: nick.0, -                }) -                .await;          }          if let Some(event) = stanza_message.event {              match event { -                Event::Items(items) => match items.node.as_str() { -                    "http://jabber.org/protocol/nick" => match items.items { -                        ItemsType::Item(items) => { -                            if let Some(item) = items.first() { -                                match &item.item { -                                    Some(c) => match c { -                                        Content::Nick(nick) => { -                                            if let Err(e) = logic -                                                .db() -                                                .upsert_user_nick(from.as_bare(), nick.0.clone()) -                                                .await -                                            { -                                                logic -                                                    .handle_error(Error::MessageRecv( -                                                        MessageRecvError::NickUpdate(e), -                                                    )) -                                                    .await; +                Event::Items(items) => { +                    match items.node.as_str() { +                        "http://jabber.org/protocol/nick" => match items.items { +                            ItemsType::Item(items) => { +                                if let Some(item) = items.first() { +                                    match &item.item { +                                        Some(c) => match c { +                                            Content::Nick(nick) => { +                                                let nick = nick.0.clone(); +                                                if nick.is_empty() { +                                                    match logic +                                                        .db() +                                                        .delete_user_nick(from.as_bare()) +                                                        .await +                                                    { +                                                        Ok(changed) => { +                                                            if changed { +                                                                logic +                                                                .update_sender() +                                                                .send(UpdateMessage::NickChanged { +                                                                    jid: from.as_bare(), +                                                                    nick: None, +                                                                }) +                                                                .await; +                                                            } +                                                        } +                                                        Err(e) => { +                                                            logic +                                                                .handle_error(Error::MessageRecv( +                                                                    MessageRecvError::NickUpdate(e), +                                                                )) +                                                                .await; +                                                            // if failed, send user update anyway +                                                            logic +                                                                .update_sender() +                                                                .send(UpdateMessage::NickChanged { +                                                                    jid: from.as_bare(), +                                                                    nick: None, +                                                                }) +                                                                .await; +                                                        } +                                                    } +                                                } else { +                                                    match logic +                                                        .db() +                                                        .upsert_user_nick( +                                                            from.as_bare(), +                                                            nick.clone(), +                                                        ) +                                                        .await +                                                    { +                                                        Ok(changed) => { +                                                            if changed { +                                                                logic +                                                                .update_sender() +                                                                .send(UpdateMessage::NickChanged { +                                                                    jid: from.as_bare(), +                                                                    nick: Some(nick), +                                                                }) +                                                                .await; +                                                            } +                                                        } +                                                        Err(e) => { +                                                            logic +                                                                .handle_error(Error::MessageRecv( +                                                                    MessageRecvError::NickUpdate(e), +                                                                )) +                                                                .await; +                                                            // if failed, send user update anyway +                                                            logic +                                                                .update_sender() +                                                                .send(UpdateMessage::NickChanged { +                                                                    jid: from.as_bare(), +                                                                    nick: Some(nick), +                                                                }) +                                                                .await; +                                                        } +                                                    } +                                                }                                              } +                                            _ => {} +                                        }, +                                        None => {} +                                    } +                                } +                            } +                            _ => {} +                        }, +                        "urn:xmpp:avatar:metadata" => { +                            match items.items { +                                ItemsType::Item(items) => { +                                    if let Some(item) = items.first() { +                                        match &item.item { +                                            Some(Content::AvatarMetadata(metadata)) => { +                                                // check if user avatar has been deleted +                                                if let Some(metadata) = metadata +                                                    .info +                                                    .iter() +                                                    .find(|info| info.url.is_none()) +                                                { +                                                    // check if user avatar has changed +                                                    match logic +                                                        .db() +                                                        .upsert_user_avatar( +                                                            from.as_bare(), +                                                            metadata.id.clone(), +                                                        ) +                                                        .await +                                                    { +                                                        Ok((changed, old_avatar)) => { +                                                            if changed { +                                                                if let Some(old_avatar) = old_avatar +                                                                { +                                                                    if let Err(e) = logic +                                                                        .file_store() +                                                                        .delete(&old_avatar) +                                                                        .await.map_err(|err| AvatarUpdateError::FileStore(err)) { +                                                                            logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; +                                                                    } +                                                                } -                                            logic -                                                .update_sender() -                                                .send(UpdateMessage::NickChanged { -                                                    jid: from.as_bare(), -                                                    nick: nick.0.clone(), -                                                }) -                                                .await; +                                                                match logic +                                                                    .file_store() +                                                                    .is_stored(&metadata.id) +                                                                    .await +                                                                    .map_err(|err| { +                                                                        AvatarUpdateError::<Fs>::FileStore( +                                                                            err, +                                                                        ) +                                                                    }) { +                                                                    Ok(false) => { +                                                                        // get data +                                                                        let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?; +                                                                        match pep_item { +                                                                            crate::pep::Item::AvatarData(data) => { +                                                                                let data = data.map(|data| data.data_b64).unwrap_or_default(); +                                                                                // TODO: these should all be in a separate avatarupdate function +                                                                                match BASE64_STANDARD.decode(data) { +                                                                                    Ok(data) => { +                                                                                        let mut hasher = Sha1::new(); +                                                                                        hasher.update(&data); +                                                                                        let received_data_hash = BASE64_STANDARD.encode(hasher.finalize()); +                                                                                        if received_data_hash == metadata.id { +                                                                                            if let Err(e) = logic.file_store().store(&received_data_hash, &data).await { +                                                                                                logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await; +                                                                                            } +                                                                                            logic +                                                                                            .update_sender() +                                                                                            .send( +                                                                                                UpdateMessage::AvatarChanged { +                                                                                                    jid: from.as_bare(), +                                                                                                    id: Some( +                                                                                                        metadata.id.clone(), +                                                                                                    ), +                                                                                                }, +                                                                                            ) +                                                                                            .await; +                                                                                        } +                                                                                    }, +                                                                                    Err(e) => { +                                                                                        logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await; +                                                                                    }, +                                                                                } +                                                                            }, +                                                                            _ => { +                                                                                logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await; +                                                                            } +                                                                        } +                                                                    } +                                                                    Ok(true) => { +                                                                        // just send the update +                                                                        logic +                                                                        .update_sender() +                                                                        .send( +                                                                            UpdateMessage::AvatarChanged { +                                                                                jid: from.as_bare(), +                                                                                id: Some( +                                                                                    metadata.id.clone(), +                                                                                ), +                                                                            }, +                                                                        ) +                                                                        .await; +                                                                    } +                                                                    Err(e) => { +                                                                        logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await; +                                                                    } +                                                                } +                                                            } +                                                        } +                                                        Err(e) => { +                                                            logic +                                                                .handle_error(Error::MessageRecv( +                                                                    MessageRecvError::AvatarUpdate( +                                                                        AvatarUpdateError::Database( +                                                                            e, +                                                                        ), +                                                                    ), +                                                                )) +                                                                .await; +                                                        } +                                                    } +                                                } else { +                                                    // delete avatar +                                                    match logic +                                                        .db() +                                                        .delete_user_avatar(from.as_bare()) +                                                        .await +                                                    { +                                                        Ok((changed, old_avatar)) => { +                                                            if changed { +                                                                if let Some(old_avatar) = old_avatar +                                                                { +                                                                    if let Err(e) = logic +                                                                        .file_store() +                                                                        .delete(&old_avatar) +                                                                        .await.map_err(|err| AvatarUpdateError::FileStore(err)) { +                                                                            logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; +                                                                    } +                                                                } +                                                                logic +                                                                    .update_sender() +                                                                    .send( +                                                                        UpdateMessage::AvatarChanged { +                                                                            jid: from.as_bare(), +                                                                            id: None, +                                                                        }, +                                                                    ) +                                                                    .await; +                                                            } +                                                        } +                                                        Err(e) => { +                                                            logic +                                                                .handle_error(Error::MessageRecv( +                                                                    MessageRecvError::AvatarUpdate( +                                                                        AvatarUpdateError::Database( +                                                                            e, +                                                                        ), +                                                                    ), +                                                                )) +                                                                .await; +                                                        } +                                                    } +                                                } +                                                // check if the new file is in the file store +                                                // if not, retrieve from server and save in the file store (remember to check if the hash matches) +                                                // send the avatar update +                                            } +                                            _ => {}                                          } -                                        Content::Unknown(element) => {} -                                    }, -                                    None => {} +                                    }                                  } +                                _ => {}                              }                          } -                        ItemsType::Retract(retracts) => {} -                    }, -                    _ => {} -                }, +                        _ => {} +                    } +                }                  // Event::Collection(collection) => todo!(),                  // Event::Configuration(configuration) => todo!(),                  // Event::Delete(delete) => todo!(),                  // Event::Purge(purge) => todo!(),                  // Event::Subscription(subscription) => todo!(), -                _ => {} +                _ => {} // TODO: catch these catch-alls in some way              }          } @@ -240,8 +526,8 @@ pub async fn recv_presence(      }  } -pub async fn recv_iq( -    logic: ClientLogic, +pub async fn recv_iq<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>,      connection: Connected,      iq: Iq,  ) -> Result<Option<UpdateMessage>, IqError> { @@ -469,11 +755,11 @@ pub async fn recv_iq(      }  } -pub async fn process_stanza( -    logic: ClientLogic, +pub async fn process_stanza<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>,      stanza: Stanza,      connection: Connected, -) -> Result<Option<UpdateMessage>, Error> { +) -> Result<Option<UpdateMessage>, Error<Fs>> {      let update = match stanza {          Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),          Stanza::Presence(presence) => Ok(recv_presence(presence).await?), diff --git a/filamento/src/pep.rs b/filamento/src/pep.rs index c71d843..3cd243f 100644 --- a/filamento/src/pep.rs +++ b/filamento/src/pep.rs @@ -1,17 +1,8 @@ -// in commandmessage -// pub struct Publish { -//     item: Item, -//     node: Option<String>, -//     // no need for node, as item has the node -// } -// -// in updatemessage -// pub struct Event { -//     from: JID, -//     item: Item, -// } +use crate::avatar::{Data as AvatarData, Metadata as AvatarMetadata};  #[derive(Clone, Debug)]  pub enum Item { -    Nick(String), +    Nick(Option<String>), +    AvatarMetadata(Option<AvatarMetadata>), +    AvatarData(Option<AvatarData>),  } | 
