diff options
Diffstat (limited to 'filamento/src/logic')
| -rw-r--r-- | filamento/src/logic/abort.rs | 4 | ||||
| -rw-r--r-- | filamento/src/logic/connect.rs | 6 | ||||
| -rw-r--r-- | filamento/src/logic/connection_error.rs | 7 | ||||
| -rw-r--r-- | filamento/src/logic/disconnect.rs | 7 | ||||
| -rw-r--r-- | filamento/src/logic/local_only.rs | 37 | ||||
| -rw-r--r-- | filamento/src/logic/mod.rs | 27 | ||||
| -rw-r--r-- | filamento/src/logic/offline.rs | 45 | ||||
| -rw-r--r-- | filamento/src/logic/online.rs | 362 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 422 | 
9 files changed, 762 insertions, 155 deletions
| diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs index df82655..3588b13 100644 --- a/filamento/src/logic/abort.rs +++ b/filamento/src/logic/abort.rs @@ -1,7 +1,9 @@  use lampada::error::ReadError; +use crate::files::FileStore; +  use super::ClientLogic; -pub async fn on_abort(logic: ClientLogic) { +pub async fn on_abort<Fs: FileStore + Clone>(logic: ClientLogic<Fs>) {      logic.pending().drain().await;  } diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs index dc05448..37cdad5 100644 --- a/filamento/src/logic/connect.rs +++ b/filamento/src/logic/connect.rs @@ -5,12 +5,16 @@ use tracing::debug;  use crate::{      Command, UpdateMessage,      error::{ConnectionJobError, Error, RosterError}, +    files::FileStore,      presence::{Online, PresenceType},  };  use super::ClientLogic; -pub async fn handle_connect(logic: ClientLogic, connection: Connected) { +pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>( +    logic: ClientLogic<Fs>, +    connection: Connected, +) {      let (send, recv) = oneshot::channel();      debug!("getting roster");      logic diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs index 081900b..36c1cef 100644 --- a/filamento/src/logic/connection_error.rs +++ b/filamento/src/logic/connection_error.rs @@ -1,7 +1,12 @@  use lampada::error::ConnectionError; +use crate::files::FileStore; +  use super::ClientLogic; -pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { +pub async fn handle_connection_error<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>, +    error: ConnectionError, +) {      logic.handle_error(error.into()).await;  } diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs index 241c3e6..ebcfd4f 100644 --- a/filamento/src/logic/disconnect.rs +++ b/filamento/src/logic/disconnect.rs @@ -1,11 +1,14 @@  use lampada::Connected;  use stanza::client::Stanza; -use crate::{UpdateMessage, presence::Offline}; +use crate::{UpdateMessage, files::FileStore, presence::Offline};  use super::ClientLogic; -pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) { +pub async fn handle_disconnect<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>, +    connection: Connected, +) {      // TODO: be able to set offline status message      let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None);      let stanza = Stanza::Presence(offline_presence); diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs index 3f6fe8d..cabbef4 100644 --- a/filamento/src/logic/local_only.rs +++ b/filamento/src/logic/local_only.rs @@ -4,44 +4,61 @@ use uuid::Uuid;  use crate::{      chat::{Chat, Message},      error::DatabaseError, +    files::FileStore,      user::User,  };  use super::ClientLogic; -pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> {      Ok(logic.db().read_chats().await?)  } -pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats_ordered<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> {      Ok(logic.db().read_chats_ordered().await?)  } -pub async fn handle_get_chats_ordered_with_latest_messages( -    logic: &ClientLogic, +pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,  ) -> Result<Vec<(Chat, Message)>, DatabaseError> {      Ok(logic.db().read_chats_ordered_with_latest_messages().await?)  } -pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> { +pub async fn handle_get_chat<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    jid: JID, +) -> Result<Chat, DatabaseError> {      Ok(logic.db().read_chat(jid).await?)  } -pub async fn handle_get_messages( -    logic: &ClientLogic, +pub async fn handle_get_messages<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      jid: JID,  ) -> Result<Vec<Message>, DatabaseError> {      Ok(logic.db().read_message_history(jid).await?)  } -pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> { +pub async fn handle_delete_chat<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    jid: JID, +) -> Result<(), DatabaseError> {      Ok(logic.db().delete_chat(jid).await?)  } -pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> { +pub async fn handle_delete_messaage<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    uuid: Uuid, +) -> Result<(), DatabaseError> {      Ok(logic.db().delete_message(uuid).await?)  } -pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> { +pub async fn handle_get_user<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    jid: JID, +) -> Result<User, DatabaseError> {      Ok(logic.db().read_user(jid).await?)  } diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 1ddd7d3..5e05dac 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -4,12 +4,13 @@ use jid::JID;  use lampada::{Connected, Logic, error::ReadError};  use stanza::client::Stanza;  use tokio::sync::{Mutex, mpsc, oneshot}; -use tracing::{error, info, warn}; +use tracing::{error, info};  use crate::{      Client, Command, UpdateMessage,      db::Db,      error::{Error, IqRequestError, ResponseError}, +    files::FileStore,  };  mod abort; @@ -22,12 +23,13 @@ mod online;  mod process_stanza;  #[derive(Clone)] -pub struct ClientLogic { -    client: Client, +pub struct ClientLogic<Fs: FileStore> { +    client: Client<Fs>,      bare_jid: JID,      db: Db,      pending: Pending,      update_sender: mpsc::Sender<UpdateMessage>, +    file_store: Fs,  }  #[derive(Clone)] @@ -75,12 +77,13 @@ impl Pending {      }  } -impl ClientLogic { +impl<Fs: FileStore> ClientLogic<Fs> {      pub fn new( -        client: Client, +        client: Client<Fs>,          bare_jid: JID,          db: Db,          update_sender: mpsc::Sender<UpdateMessage>, +        file_store: Fs,      ) -> Self {          Self {              db, @@ -88,10 +91,11 @@ impl ClientLogic {              update_sender,              client,              bare_jid, +            file_store,          }      } -    pub fn client(&self) -> &Client { +    pub fn client(&self) -> &Client<Fs> {          &self.client      } @@ -103,6 +107,10 @@ impl ClientLogic {          &self.pending      } +    pub fn file_store(&self) -> &Fs { +        &self.file_store +    } +      pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {          &self.update_sender      } @@ -113,13 +121,14 @@ impl ClientLogic {          self.update_sender().send(update).await;      } -    pub async fn handle_error(&self, e: Error) { +    // TODO: delete this +    pub async fn handle_error(&self, e: Error<Fs>) {          error!("{}", e);      }  } -impl Logic for ClientLogic { -    type Cmd = Command; +impl<Fs: FileStore + Clone + Send + Sync> Logic for ClientLogic<Fs> { +    type Cmd = Command<Fs>;      // pub async fn handle_stream_error(self, error) {}      // stanza errors (recoverable) diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index 6399cf7..566972c 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -1,3 +1,5 @@ +use std::process::id; +  use chrono::Utc;  use lampada::error::WriteError;  use uuid::Uuid; @@ -6,9 +8,10 @@ use crate::{      Command,      chat::{Delivery, Message},      error::{ -        DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError, -        StatusError, +        AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, +        NickError, PEPError, RosterError, StatusError,      }, +    files::FileStore,      presence::Online,      roster::Contact,  }; @@ -22,7 +25,7 @@ use super::{      },  }; -pub async fn handle_offline(logic: ClientLogic, command: Command) { +pub async fn handle_offline<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>) {      let result = handle_offline_result(&logic, command).await;      match result {          Ok(_) => {} @@ -30,16 +33,24 @@ pub async fn handle_offline(logic: ClientLogic, command: Command) {      }  } -pub async fn handle_set_status(logic: &ClientLogic, online: Online) -> Result<(), StatusError> { +pub async fn handle_set_status<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    online: Online, +) -> Result<(), StatusError> {      logic.db().upsert_cached_status(online).await?;      Ok(())  } -pub async fn handle_get_roster(logic: &ClientLogic) -> Result<Vec<Contact>, RosterError> { +pub async fn handle_get_roster<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +) -> Result<Vec<Contact>, RosterError> {      Ok(logic.db().read_cached_roster().await?)  } -pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Result<(), Error> { +pub async fn handle_offline_result<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    command: Command<Fs>, +) -> Result<(), Error<Fs>> {      match command {          Command::GetRoster(sender) => {              let roster = handle_get_roster(logic).await; @@ -77,7 +88,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res              let user = handle_get_user(logic, jid).await;              sender.send(user);          } -        // TODO: offline queue to modify roster          Command::AddContact(_jid, sender) => {              sender.send(Err(RosterError::Write(WriteError::Disconnected)));          } @@ -112,7 +122,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res              let result = handle_set_status(logic, online).await;              sender.send(result);          } -        // TODO: offline message queue          Command::SendMessage(jid, body) => {              let id = Uuid::new_v4();              let timestamp = Utc::now(); @@ -159,7 +168,7 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res          Command::DiscoItems(_jid, _node, sender) => {              sender.send(Err(DiscoError::Write(WriteError::Disconnected)));          } -        Command::Publish { +        Command::PublishPEPItem {              item: _,              node: _,              sender, @@ -169,6 +178,24 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res          Command::ChangeNick(_, sender) => {              sender.send(Err(NickError::Disconnected));          } +        Command::ChangeAvatar(_items, sender) => { +            sender.send(Err(AvatarPublishError::Disconnected)); +        } +        Command::DeletePEPNode { node: _, sender } => { +            sender.send(Err(PEPError::IqResponse(IqRequestError::Write( +                WriteError::Disconnected, +            )))); +        } +        Command::GetPEPItem { +            node: _, +            sender, +            jid: _, +            id: _, +        } => { +            sender.send(Err(PEPError::IqResponse(IqRequestError::Write( +                WriteError::Disconnected, +            )))); +        }      }      Ok(())  } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index b069f59..745adc1 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,23 +1,24 @@ +use std::io::Cursor; + +use base64::{prelude::BASE64_STANDARD, Engine};  use chrono::Utc; +use image::ImageReader;  use jid::JID;  use lampada::{Connected, WriteMessage, error::WriteError}; +use sha1::{Digest, Sha1};  use stanza::{      client::{          iq::{self, Iq, IqType, Query}, Stanza -    }, -    xep_0030::{info, items}, -    xep_0060::pubsub::{self, Pubsub}, -    xep_0172::{self, Nick}, -    xep_0203::Delay, +    }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay  };  use tokio::sync::oneshot;  use tracing::{debug, error, info};  use uuid::Uuid;  use crate::{ -    chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ -        DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError -    }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage +    avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ +        AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError +    }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage  };  use super::{ @@ -29,7 +30,7 @@ use super::{      },  }; -pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { +pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {      let result = handle_online_result(&logic, command, connection).await;      match result {          Ok(_) => {} @@ -37,8 +38,8 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con      }  } -pub async fn handle_get_roster( -    logic: &ClientLogic, +pub async fn handle_get_roster<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,  ) -> Result<Vec<Contact>, RosterError> {      let iq_id = Uuid::new_v4().to_string(); @@ -96,8 +97,8 @@ pub async fn handle_get_roster(      }  } -pub async fn handle_add_contact( -    logic: &ClientLogic, +pub async fn handle_add_contact<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), RosterError> { @@ -154,8 +155,8 @@ pub async fn handle_add_contact(      }  } -pub async fn handle_buddy_request( -    logic: &ClientLogic, +pub async fn handle_buddy_request<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), SubscribeError> { @@ -177,8 +178,8 @@ pub async fn handle_buddy_request(      Ok(())  } -pub async fn handle_subscription_request( -    logic: &ClientLogic, +pub async fn handle_subscription_request<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), SubscribeError> { @@ -195,8 +196,8 @@ pub async fn handle_subscription_request(      Ok(())  } -pub async fn handle_accept_buddy_request( -    logic: &ClientLogic, +pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), SubscribeError> { @@ -218,8 +219,8 @@ pub async fn handle_accept_buddy_request(      Ok(())  } -pub async fn handle_accept_subscription_request( -    logic: &ClientLogic, +pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), SubscribeError> { @@ -274,8 +275,8 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<      Ok(())  } -pub async fn handle_delete_contact( -    logic: &ClientLogic, +pub async fn handle_delete_contact<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,  ) -> Result<(), RosterError> { @@ -333,8 +334,8 @@ pub async fn handle_delete_contact(      }  } -pub async fn handle_update_contact( -    logic: &ClientLogic, +pub async fn handle_update_contact<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: JID,      contact_update: ContactUpdate, @@ -398,8 +399,8 @@ pub async fn handle_update_contact(      }  } -pub async fn handle_set_status( -    logic: &ClientLogic, +pub async fn handle_set_status<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      online: Online,  ) -> Result<(), StatusError> { @@ -411,9 +412,18 @@ pub async fn handle_set_status(      Ok(())  } -pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { +pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) {      // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. -    let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); +    // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); +    let have_chatted  = match logic.db().upsert_chat_and_user(&jid).await { +        Ok(have_chatted) => { +            have_chatted +        }, +        Err(e) => { +            error!("{}", e); +            false +        }, +    };      let nick;      let mark_chat_as_chatted; @@ -506,6 +516,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid                  })                  .await;              if mark_chat_as_chatted { +                debug!("marking chat as chatted");                  if let Err(e) = logic.db.mark_chat_as_chatted(jid).await {                      logic                          .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) @@ -540,8 +551,8 @@ pub async fn handle_send_presence(      Ok(())  } -pub async fn handle_disco_info( -    logic: &ClientLogic, +pub async fn handle_disco_info<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: Option<JID>,      node: Option<String>, @@ -611,8 +622,8 @@ pub async fn handle_disco_info(      }  } -pub async fn handle_disco_items( -    logic: &ClientLogic, +pub async fn handle_disco_items<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      jid: Option<JID>,      node: Option<String>, @@ -680,20 +691,60 @@ pub async fn handle_disco_items(      }  } -pub async fn handle_publish( -    logic: &ClientLogic, +pub async fn handle_publish_pep_item<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>,      connection: Connected,      item: pep::Item,      node: String, -) -> Result<(), PublishError> { +) -> Result<(), PEPError> {      let id = Uuid::new_v4().to_string();      let publish = match item { -        pep::Item::Nick(n) => pubsub::Publish { -            node, -            items: vec![pubsub::Item { -                item: Some(pubsub::Content::Nick(Nick(n))), -                ..Default::default() -            }], +        pep::Item::Nick(n) => { +            if let Some(n) = n { +                pubsub::Publish { +                    node, +                    items: vec![pubsub::Item { +                        item: Some(pubsub::Content::Nick(Nick(n))), +                        ..Default::default() +                    }], +                } +            } else { +                pubsub::Publish { +                    node, +                    items: vec![pubsub::Item { +                        item: Some(pubsub::Content::Nick(Nick("".to_string()))), +                        ..Default::default() +                    }] +                } +            } +        }, +        pep::Item::AvatarMetadata(metadata) => { +            if let Some(metadata) = metadata { +                pubsub::Publish { node, items: vec![pubsub::Item { +                    item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })), +                    id: Some(metadata.hash), +                    ..Default::default() +                }]} +            } else { +                pubsub::Publish { node, items: vec![pubsub::Item { +                    item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })), +                    ..Default::default() +                }]} +            } +        }, +        pep::Item::AvatarData(data) => { +            if let Some(data) = data { +                pubsub::Publish { node, items: vec![pubsub::Item { +                    item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))), +                    id: Some(data.hash), +                    ..Default::default() +                }] } +            } else { +                pubsub::Publish { node, items: vec![pubsub::Item { +                    item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))), +                    ..Default::default() +                }]} +            }          },      };      let request = Iq { @@ -726,38 +777,229 @@ pub async fn handle_publish(                          if let Some(query) = query {                              match query {                                  Query::Pubsub(_) => Ok(()), -                                q => Err(PublishError::MismatchedQuery(q)), +                                q => Err(PEPError::MismatchedQuery(q)),                              }                          } else { -                            Err(PublishError::MissingQuery) +                            Err(PEPError::MissingQuery)                          }                      }                      IqType::Error => { -                        Err(PublishError::StanzaErrors(errors)) +                        Err(PEPError::StanzaErrors(errors))                      }                      _ => unreachable!(),                  }              } else { -                Err(PublishError::IncorrectEntity( +                Err(PEPError::IncorrectEntity(                      from.unwrap_or_else(|| connection.jid().as_bare()),                  ))              }          } -        s => Err(PublishError::UnexpectedStanza(s)), +        s => Err(PEPError::UnexpectedStanza(s)),      }  } -pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { +pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> { +    let stanza_id = Uuid::new_v4().to_string(); +    let request = Iq { +        from: Some(connection.jid().clone()), +        id: stanza_id.clone(), +        to: jid.clone(), +        r#type: IqType::Get, +        lang: None, +        query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items { +            max_items: None, +            node, +            subid: None, +            items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }], +        }))), +        errors: Vec::new(), +    }; +    match logic +        .pending() +        .request(&connection, Stanza::Iq(request), stanza_id) +        .await? { +         +        Stanza::Iq(Iq { +            from, +            r#type, +            query, +            errors, +            .. +            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. +        }) if r#type == IqType::Result || r#type == IqType::Error => { +            if from == jid || { +                if jid == None { +                    from == Some(connection.jid().as_bare()) +                } else { +                    false +                } +            } { +                match r#type { +                    IqType::Result => { +                        if let Some(query) = query { +                            match query { +                                Query::Pubsub(Pubsub::Items(mut items)) => { +                                    if let Some(item) = items.items.pop() { +                                        if item.id == Some(id.clone()) { +                                            match item.item.ok_or(PEPError::MissingItem)? { +                                                pubsub::Content::Nick(nick) => { +                                                    if nick.0.is_empty() { +                                                        Ok(pep::Item::Nick(None)) +                                                    } else { +                                                        Ok(pep::Item::Nick(Some(nick.0))) +                                                         +                                                    } +                                                }, +                                                pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))), +                                                pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))), +                                                pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem), +                                            } +                                        } else { +                                            Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string()))) +                                        } +                                    } else { +                                        Err(PEPError::MissingItem) +                                    } +                                }, +                                q => Err(PEPError::MismatchedQuery(q)), +                            } +                        } else { +                            Err(PEPError::MissingQuery) +                        } +                    } +                    IqType::Error => { +                        Err(PEPError::StanzaErrors(errors)) +                    } +                    _ => unreachable!(), +                } +            } else { +                // TODO: include expected entity +                Err(PEPError::IncorrectEntity( +                    from.unwrap_or_else(|| connection.jid().as_bare()), +                )) +            } +        } +        s => Err(PEPError::UnexpectedStanza(s)), +    } +} + +pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> {      logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?;      Ok(())  } +pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> { +    match img_data { +        // set avatar +        Some(data) => { +            // load the image data and guess the format +            let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?; + +            // convert the image to png; +            let mut data_png = Vec::new(); +            image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Png)?; + +            // calculate the length of the data in bytes. +            let bytes = data_png.len().try_into()?; + +            // calculate sha1 hash of the data +            let mut sha1 = Sha1::new(); +            sha1.update(&data_png); +            let sha1_result = sha1.finalize(); +            let hash = BASE64_STANDARD.encode(sha1_result); + +            // encode the image data as base64 +            let data_b64 = BASE64_STANDARD.encode(data_png.clone()); + +            // publish the data to the data node +            logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?; + +            // publish the metadata to the metadata node +            logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/png".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?; + +            // if everything went well, save the data to the disk. + +            if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? { +                logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))? +            } +            // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval. +            // TODO: should the node be purged? + +            Ok(()) +        }, +        // remove avatar +        None => { +            logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?; +            logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?; +            Ok(()) +        }, +    } +} + +pub async fn handle_delete_pep_node<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    connection: Connected, +    node: String, +) -> Result<(), PEPError> { +    let id = Uuid::new_v4().to_string(); +    let request = Iq { +        from: Some(connection.jid().clone()), +        id: id.clone(), +        to: None, +        r#type: IqType::Set, +        lang: None, +        query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))), +        errors: Vec::new(), +    }; +    match logic +        .pending() +        .request(&connection, Stanza::Iq(request), id) +        .await? { +         +        Stanza::Iq(Iq { +            from, +            r#type, +            query, +            errors, +            .. +            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. +        }) if r#type == IqType::Result || r#type == IqType::Error => { +            if from == None ||  +                    from == Some(connection.jid().as_bare()) +             { +                match r#type { +                    IqType::Result => { +                        if let Some(query) = query { +                            match query { +                                Query::PubsubOwner(_) => Ok(()), +                                q => Err(PEPError::MismatchedQuery(q)), +                            } +                        } else { +                            // Err(PEPError::MissingQuery) +                            Ok(()) +                        } +                    } +                    IqType::Error => { +                        Err(PEPError::StanzaErrors(errors)) +                    } +                    _ => unreachable!(), +                } +            } else { +                Err(PEPError::IncorrectEntity( +                    from.unwrap_or_else(|| connection.jid().as_bare()), +                )) +            } +        } +        s => Err(PEPError::UnexpectedStanza(s)), +    } +} +  // TODO: could probably macro-ise? -pub async fn handle_online_result( -    logic: &ClientLogic, -    command: Command, +pub async fn handle_online_result<Fs: FileStore + Clone>( +    logic: &ClientLogic<Fs>, +    command: Command<Fs>,      connection: Connected, -) -> Result<(), Error> { +) -> Result<(), Error<Fs>> {      match command {          Command::GetRoster(result_sender) => {              let roster = handle_get_roster(logic, connection).await; @@ -854,14 +1096,26 @@ pub async fn handle_online_result(              let result = handle_disco_items(logic, connection, jid, node).await;              let _ = sender.send(result);          } -        Command::Publish { item, node, sender } => { -            let result = handle_publish(logic, connection, item, node).await; +        Command::PublishPEPItem { item, node, sender } => { +            let result = handle_publish_pep_item(logic, connection, item, node).await;              let _ = sender.send(result);          }          Command::ChangeNick(nick, sender) => {              let result = handle_change_nick(logic, nick).await;              let _ = sender.send(result);          } +        Command::ChangeAvatar(img_data, sender) => { +            let result = handle_change_avatar(logic, img_data).await; +            let _ = sender.send(result); +        }, +        Command::DeletePEPNode { node, sender } => { +            let result = handle_delete_pep_node(logic, connection, node).await; +            let _ = sender.send(result); +        }, +        Command::GetPEPItem { node, sender, jid, id } => { +            let result = handle_get_pep_item(logic, connection, jid, node, id).await; +            let _ = sender.send(result); +        },      }      Ok(())  } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 11d7588..c383d70 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -1,7 +1,9 @@  use std::str::FromStr; +use base64::{Engine, prelude::BASE64_STANDARD};  use chrono::Utc;  use lampada::{Connected, SupervisorSender}; +use sha1::{Digest, Sha1};  use stanza::{      client::{          Stanza, @@ -17,14 +19,22 @@ use uuid::Uuid;  use crate::{      UpdateMessage, caps,      chat::{Body, Message}, -    error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError}, +    error::{ +        AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError, +        RosterError, +    }, +    files::FileStore,      presence::{Offline, Online, Presence, PresenceType, Show},      roster::Contact,  };  use super::ClientLogic; -pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +pub async fn handle_stanza<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>, +    stanza: Stanza, +    connection: Connected, +) {      let result = process_stanza(logic.clone(), stanza, connection).await;      match result {          Ok(u) => match u { @@ -38,10 +48,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne      }  } -pub async fn recv_message( -    logic: ClientLogic, +pub async fn recv_message<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>,      stanza_message: stanza::client::message::Message, -) -> Result<Option<UpdateMessage>, MessageRecvError> { +) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> {      if let Some(from) = stanza_message.from {          // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.          let timestamp = stanza_message @@ -50,6 +60,7 @@ pub async fn recv_message(              .unwrap_or_else(|| Utc::now());          // TODO: group chat messages +        // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state.          // if there is a body, should create chat message          if let Some(body) = stanza_message.body {              let message = Message { @@ -67,16 +78,28 @@ pub async fn recv_message(              };              // save the message to the database -            logic.db().upsert_chat_and_user(&from).await?; -            if let Err(e) = logic -                .db() -                .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) -                .await -            { -                logic -                    .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) -                    .await; -            } +            match logic.db().upsert_chat_and_user(&from).await { +                Ok(_) => { +                    if let Err(e) = logic +                        .db() +                        .create_message_with_user_resource( +                            message.clone(), +                            from.clone(), +                            from.clone(), +                        ) +                        .await +                    { +                        logic +                            .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) +                            .await; +                    } +                } +                Err(e) => { +                    logic +                        .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) +                        .await; +                } +            };              // update the client with the new message              logic @@ -89,70 +112,333 @@ pub async fn recv_message(          }          if let Some(nick) = stanza_message.nick { -            if let Err(e) = logic -                .db() -                .upsert_user_nick(from.as_bare(), nick.0.clone()) -                .await -            { -                logic -                    .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) -                    .await; +            let nick = nick.0; +            if nick.is_empty() { +                match logic.db().delete_user_nick(from.as_bare()).await { +                    Ok(changed) => { +                        if changed { +                            logic +                                .update_sender() +                                .send(UpdateMessage::NickChanged { +                                    jid: from.as_bare(), +                                    nick: None, +                                }) +                                .await; +                        } +                    } +                    Err(e) => { +                        logic +                            .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) +                            .await; +                        // if failed, send user update anyway +                        logic +                            .update_sender() +                            .send(UpdateMessage::NickChanged { +                                jid: from.as_bare(), +                                nick: None, +                            }) +                            .await; +                    } +                } +            } else { +                match logic +                    .db() +                    .upsert_user_nick(from.as_bare(), nick.clone()) +                    .await +                { +                    Ok(changed) => { +                        if changed { +                            logic +                                .update_sender() +                                .send(UpdateMessage::NickChanged { +                                    jid: from.as_bare(), +                                    nick: Some(nick), +                                }) +                                .await; +                        } +                    } +                    Err(e) => { +                        logic +                            .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) +                            .await; +                        // if failed, send user update anyway +                        logic +                            .update_sender() +                            .send(UpdateMessage::NickChanged { +                                jid: from.as_bare(), +                                nick: Some(nick), +                            }) +                            .await; +                    } +                }              } - -            logic -                .update_sender() -                .send(UpdateMessage::NickChanged { -                    jid: from.as_bare(), -                    nick: nick.0, -                }) -                .await;          }          if let Some(event) = stanza_message.event {              match event { -                Event::Items(items) => match items.node.as_str() { -                    "http://jabber.org/protocol/nick" => match items.items { -                        ItemsType::Item(items) => { -                            if let Some(item) = items.first() { -                                match &item.item { -                                    Some(c) => match c { -                                        Content::Nick(nick) => { -                                            if let Err(e) = logic -                                                .db() -                                                .upsert_user_nick(from.as_bare(), nick.0.clone()) -                                                .await -                                            { -                                                logic -                                                    .handle_error(Error::MessageRecv( -                                                        MessageRecvError::NickUpdate(e), -                                                    )) -                                                    .await; +                Event::Items(items) => { +                    match items.node.as_str() { +                        "http://jabber.org/protocol/nick" => match items.items { +                            ItemsType::Item(items) => { +                                if let Some(item) = items.first() { +                                    match &item.item { +                                        Some(c) => match c { +                                            Content::Nick(nick) => { +                                                let nick = nick.0.clone(); +                                                if nick.is_empty() { +                                                    match logic +                                                        .db() +                                                        .delete_user_nick(from.as_bare()) +                                                        .await +                                                    { +                                                        Ok(changed) => { +                                                            if changed { +                                                                logic +                                                                .update_sender() +                                                                .send(UpdateMessage::NickChanged { +                                                                    jid: from.as_bare(), +                                                                    nick: None, +                                                                }) +                                                                .await; +                                                            } +                                                        } +                                                        Err(e) => { +                                                            logic +                                                                .handle_error(Error::MessageRecv( +                                                                    MessageRecvError::NickUpdate(e), +                                                                )) +                                                                .await; +                                                            // if failed, send user update anyway +                                                            logic +                                                                .update_sender() +                                                                .send(UpdateMessage::NickChanged { +                                                                    jid: from.as_bare(), +                                                                    nick: None, +                                                                }) +                                                                .await; +                                                        } +                                                    } +                                                } else { +                                                    match logic +                                                        .db() +                                                        .upsert_user_nick( +                                                            from.as_bare(), +                                                            nick.clone(), +                                                        ) +                                                        .await +                                                    { +                                                        Ok(changed) => { +                                                            if changed { +                                                                logic +                                                                .update_sender() +                                                                .send(UpdateMessage::NickChanged { +                                                                    jid: from.as_bare(), +                                                                    nick: Some(nick), +                                                                }) +                                                                .await; +                                                            } +                                                        } +                                                        Err(e) => { +                                                            logic +                                                                .handle_error(Error::MessageRecv( +                                                                    MessageRecvError::NickUpdate(e), +                                                                )) +                                                                .await; +                                                            // if failed, send user update anyway +                                                            logic +                                                                .update_sender() +                                                                .send(UpdateMessage::NickChanged { +                                                                    jid: from.as_bare(), +                                                                    nick: Some(nick), +                                                                }) +                                                                .await; +                                                        } +                                                    } +                                                }                                              } +                                            _ => {} +                                        }, +                                        None => {} +                                    } +                                } +                            } +                            _ => {} +                        }, +                        "urn:xmpp:avatar:metadata" => { +                            match items.items { +                                ItemsType::Item(items) => { +                                    if let Some(item) = items.first() { +                                        match &item.item { +                                            Some(Content::AvatarMetadata(metadata)) => { +                                                // check if user avatar has been deleted +                                                if let Some(metadata) = metadata +                                                    .info +                                                    .iter() +                                                    .find(|info| info.url.is_none()) +                                                { +                                                    // check if user avatar has changed +                                                    match logic +                                                        .db() +                                                        .upsert_user_avatar( +                                                            from.as_bare(), +                                                            metadata.id.clone(), +                                                        ) +                                                        .await +                                                    { +                                                        Ok((changed, old_avatar)) => { +                                                            if changed { +                                                                if let Some(old_avatar) = old_avatar +                                                                { +                                                                    if let Err(e) = logic +                                                                        .file_store() +                                                                        .delete(&old_avatar) +                                                                        .await.map_err(|err| AvatarUpdateError::FileStore(err)) { +                                                                            logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; +                                                                    } +                                                                } -                                            logic -                                                .update_sender() -                                                .send(UpdateMessage::NickChanged { -                                                    jid: from.as_bare(), -                                                    nick: nick.0.clone(), -                                                }) -                                                .await; +                                                                match logic +                                                                    .file_store() +                                                                    .is_stored(&metadata.id) +                                                                    .await +                                                                    .map_err(|err| { +                                                                        AvatarUpdateError::<Fs>::FileStore( +                                                                            err, +                                                                        ) +                                                                    }) { +                                                                    Ok(false) => { +                                                                        // get data +                                                                        let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?; +                                                                        match pep_item { +                                                                            crate::pep::Item::AvatarData(data) => { +                                                                                let data = data.map(|data| data.data_b64).unwrap_or_default(); +                                                                                // TODO: these should all be in a separate avatarupdate function +                                                                                match BASE64_STANDARD.decode(data) { +                                                                                    Ok(data) => { +                                                                                        let mut hasher = Sha1::new(); +                                                                                        hasher.update(&data); +                                                                                        let received_data_hash = BASE64_STANDARD.encode(hasher.finalize()); +                                                                                        if received_data_hash == metadata.id { +                                                                                            if let Err(e) = logic.file_store().store(&received_data_hash, &data).await { +                                                                                                logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await; +                                                                                            } +                                                                                            logic +                                                                                            .update_sender() +                                                                                            .send( +                                                                                                UpdateMessage::AvatarChanged { +                                                                                                    jid: from.as_bare(), +                                                                                                    id: Some( +                                                                                                        metadata.id.clone(), +                                                                                                    ), +                                                                                                }, +                                                                                            ) +                                                                                            .await; +                                                                                        } +                                                                                    }, +                                                                                    Err(e) => { +                                                                                        logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await; +                                                                                    }, +                                                                                } +                                                                            }, +                                                                            _ => { +                                                                                logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await; +                                                                            } +                                                                        } +                                                                    } +                                                                    Ok(true) => { +                                                                        // just send the update +                                                                        logic +                                                                        .update_sender() +                                                                        .send( +                                                                            UpdateMessage::AvatarChanged { +                                                                                jid: from.as_bare(), +                                                                                id: Some( +                                                                                    metadata.id.clone(), +                                                                                ), +                                                                            }, +                                                                        ) +                                                                        .await; +                                                                    } +                                                                    Err(e) => { +                                                                        logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await; +                                                                    } +                                                                } +                                                            } +                                                        } +                                                        Err(e) => { +                                                            logic +                                                                .handle_error(Error::MessageRecv( +                                                                    MessageRecvError::AvatarUpdate( +                                                                        AvatarUpdateError::Database( +                                                                            e, +                                                                        ), +                                                                    ), +                                                                )) +                                                                .await; +                                                        } +                                                    } +                                                } else { +                                                    // delete avatar +                                                    match logic +                                                        .db() +                                                        .delete_user_avatar(from.as_bare()) +                                                        .await +                                                    { +                                                        Ok((changed, old_avatar)) => { +                                                            if changed { +                                                                if let Some(old_avatar) = old_avatar +                                                                { +                                                                    if let Err(e) = logic +                                                                        .file_store() +                                                                        .delete(&old_avatar) +                                                                        .await.map_err(|err| AvatarUpdateError::FileStore(err)) { +                                                                            logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; +                                                                    } +                                                                } +                                                                logic +                                                                    .update_sender() +                                                                    .send( +                                                                        UpdateMessage::AvatarChanged { +                                                                            jid: from.as_bare(), +                                                                            id: None, +                                                                        }, +                                                                    ) +                                                                    .await; +                                                            } +                                                        } +                                                        Err(e) => { +                                                            logic +                                                                .handle_error(Error::MessageRecv( +                                                                    MessageRecvError::AvatarUpdate( +                                                                        AvatarUpdateError::Database( +                                                                            e, +                                                                        ), +                                                                    ), +                                                                )) +                                                                .await; +                                                        } +                                                    } +                                                } +                                                // check if the new file is in the file store +                                                // if not, retrieve from server and save in the file store (remember to check if the hash matches) +                                                // send the avatar update +                                            } +                                            _ => {}                                          } -                                        Content::Unknown(element) => {} -                                    }, -                                    None => {} +                                    }                                  } +                                _ => {}                              }                          } -                        ItemsType::Retract(retracts) => {} -                    }, -                    _ => {} -                }, +                        _ => {} +                    } +                }                  // Event::Collection(collection) => todo!(),                  // Event::Configuration(configuration) => todo!(),                  // Event::Delete(delete) => todo!(),                  // Event::Purge(purge) => todo!(),                  // Event::Subscription(subscription) => todo!(), -                _ => {} +                _ => {} // TODO: catch these catch-alls in some way              }          } @@ -240,8 +526,8 @@ pub async fn recv_presence(      }  } -pub async fn recv_iq( -    logic: ClientLogic, +pub async fn recv_iq<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>,      connection: Connected,      iq: Iq,  ) -> Result<Option<UpdateMessage>, IqError> { @@ -469,11 +755,11 @@ pub async fn recv_iq(      }  } -pub async fn process_stanza( -    logic: ClientLogic, +pub async fn process_stanza<Fs: FileStore + Clone>( +    logic: ClientLogic<Fs>,      stanza: Stanza,      connection: Connected, -) -> Result<Option<UpdateMessage>, Error> { +) -> Result<Option<UpdateMessage>, Error<Fs>> {      let update = match stanza {          Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),          Stanza::Presence(presence) => Ok(recv_presence(presence).await?), | 
