diff options
Diffstat (limited to 'filamento')
-rw-r--r-- | filamento/.gitignore | 1 | ||||
-rw-r--r-- | filamento/Cargo.toml | 6 | ||||
-rw-r--r-- | filamento/examples/example.rs | 69 | ||||
-rw-r--r-- | filamento/migrations/20240113011930_luz.sql | 1 | ||||
-rw-r--r-- | filamento/src/avatar.rs | 34 | ||||
-rw-r--r-- | filamento/src/caps.rs | 7 | ||||
-rw-r--r-- | filamento/src/db.rs | 406 | ||||
-rw-r--r-- | filamento/src/error.rs | 94 | ||||
-rw-r--r-- | filamento/src/files.rs | 77 | ||||
-rw-r--r-- | filamento/src/lib.rs | 233 | ||||
-rw-r--r-- | filamento/src/logic/abort.rs | 4 | ||||
-rw-r--r-- | filamento/src/logic/connect.rs | 8 | ||||
-rw-r--r-- | filamento/src/logic/connection_error.rs | 7 | ||||
-rw-r--r-- | filamento/src/logic/disconnect.rs | 7 | ||||
-rw-r--r-- | filamento/src/logic/local_only.rs | 53 | ||||
-rw-r--r-- | filamento/src/logic/mod.rs | 27 | ||||
-rw-r--r-- | filamento/src/logic/offline.rs | 82 | ||||
-rw-r--r-- | filamento/src/logic/online.rs | 462 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 512 | ||||
-rw-r--r-- | filamento/src/pep.rs | 17 | ||||
-rw-r--r-- | filamento/src/user.rs | 3 |
21 files changed, 1822 insertions, 288 deletions
diff --git a/filamento/.gitignore b/filamento/.gitignore index ec8a40b..1ba9f2a 100644 --- a/filamento/.gitignore +++ b/filamento/.gitignore @@ -1,2 +1,3 @@ filamento.db +files/ .sqlx/ diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml index 1c28c39..91b7e91 100644 --- a/filamento/Cargo.toml +++ b/filamento/Cargo.toml @@ -8,7 +8,7 @@ futures = "0.3.31" lampada = { version = "0.1.0", path = "../lampada" } tokio = "1.42.0" thiserror = "2.0.11" -stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115"] } +stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115", "xep_0084"] } sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } # TODO: re-export jid? jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } @@ -19,10 +19,12 @@ sha2 = "0.10.8" sha3 = "0.10.8" base64 = "0.22.1" sha1 = "0.10.6" +image = "0.25.6" +hex = "0.4.3" [dev-dependencies] tracing-subscriber = "0.3.19" -peanuts = { version = "0.1.0", path = "../../peanuts" } +peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" } [[example]] name = "example" diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index 74a9aa1..8119743 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -1,17 +1,56 @@ -use std::{path::Path, str::FromStr, time::Duration}; +use std::{path::Path, str::FromStr, sync::Arc, time::Duration}; -use filamento::{Client, db::Db}; +use filamento::{Client, db::Db, files::FileStore}; use jid::JID; +use tokio::io::{self, AsyncReadExt}; use tracing::info; +#[derive(Clone, Debug)] +pub struct Files; + +impl FileStore for Files { + type Err = Arc<io::Error>; + + async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> { + tracing::debug!("checking if {} is stored", name); + let res = tokio::fs::try_exists(format!("files/{}", name)) + .await + .map_err(|err| Arc::new(err)); + tracing::debug!("file check res: {:?}", res); + res + } + + async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> { + tracing::debug!("storing {} is stored", name); + let res = tokio::fs::write(format!("files/{}", name), data) + .await + .map_err(|err| Arc::new(err)); + tracing::debug!("file store res: {:?}", res); + res + } + + async fn delete(&self, name: &str) -> Result<(), Self::Err> { + tracing::debug!("deleting {}", name); + let res = tokio::fs::remove_file(format!("files/{}", name)) + .await + .map_err(|err| Arc::new(err)); + tracing::debug!("file delete res: {:?}", res); + res + } +} + #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); let db = Db::create_connect_and_migrate(Path::new("./filamento.db")) .await .unwrap(); - let (client, mut recv) = - Client::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db); + let (client, mut recv) = Client::new( + "test@blos.sm/testing2".try_into().unwrap(), + "slayed".to_string(), + db, + Files, + ); tokio::spawn(async move { while let Some(msg) = recv.recv().await { @@ -22,7 +61,16 @@ async fn main() { client.connect().await.unwrap(); tokio::time::sleep(Duration::from_secs(5)).await; info!("changing nick"); - client.change_nick("britney".to_string()).await.unwrap(); + client + .change_nick(Some("britney".to_string())) + .await + .unwrap(); + let mut profile_pic = tokio::fs::File::open("files/britney_starbies.jpg") + .await + .unwrap(); + let mut data = Vec::new(); + profile_pic.read_to_end(&mut data).await.unwrap(); + client.change_avatar(Some(data)).await.unwrap(); info!("sending message"); client .send_message( @@ -34,7 +82,16 @@ async fn main() { .await .unwrap(); info!("sent message"); - tokio::time::sleep(Duration::from_secs(5)).await; + client + .send_message( + JID::from_str("cel@blos.sm").unwrap(), + filamento::chat::Body { + body: "hallo 2".to_string(), + }, + ) + .await + .unwrap(); + tokio::time::sleep(Duration::from_secs(15)).await; // info!("sending disco query"); // let info = client.disco_info(None, None).await.unwrap(); // info!("got disco result: {:#?}", info); diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql index 8c1b01c..c2f35dd 100644 --- a/filamento/migrations/20240113011930_luz.sql +++ b/filamento/migrations/20240113011930_luz.sql @@ -6,6 +6,7 @@ create table users( -- TODO: enforce bare jid jid text primary key not null, nick text, + avatar text, -- can receive presence status from non-contacts cached_status_message text -- TODO: last_seen diff --git a/filamento/src/avatar.rs b/filamento/src/avatar.rs new file mode 100644 index 0000000..a6937df --- /dev/null +++ b/filamento/src/avatar.rs @@ -0,0 +1,34 @@ +#[derive(Clone, Debug)] +pub struct Metadata { + pub bytes: u32, + pub hash: String, + pub r#type: String, +} + +#[derive(Clone, Debug)] +pub struct Data { + pub hash: String, + pub data_b64: String, +} + +#[derive(Clone, Debug)] +pub struct Avatar(Vec<u8>); + +impl From<stanza::xep_0084::Info> for Metadata { + fn from(value: stanza::xep_0084::Info) -> Self { + Self { + bytes: value.bytes, + hash: value.id, + r#type: value.r#type, + } + } +} + +impl From<stanza::xep_0084::Data> for Data { + fn from(value: stanza::xep_0084::Data) -> Self { + Self { + hash: todo!(), + data_b64: todo!(), + } + } +} diff --git a/filamento/src/caps.rs b/filamento/src/caps.rs index 49d05ba..819e669 100644 --- a/filamento/src/caps.rs +++ b/filamento/src/caps.rs @@ -35,12 +35,13 @@ pub fn client_info() -> Info { Info { node: None, features: vec![ - "http://jabber.org/protocol/disco#items".to_string(), - "http://jabber.org/protocol/disco#info".to_string(), "http://jabber.org/protocol/caps".to_string(), - "http://jabber.org/protocol/nick".to_string(), + "http://jabber.org/protocol/disco#info".to_string(), + "http://jabber.org/protocol/disco#items".to_string(), "http://jabber.org/protocol/nick+notify".to_string(), + "urn:xmpp:avatar:metadata+notify".to_string(), ], + // "http://jabber.org/protocol/nick".to_string(), identities: vec![Identity { name: Some("filamento 0.1.0".to_string()), category: Category::Client(identity::Client::PC), diff --git a/filamento/src/db.rs b/filamento/src/db.rs index c19f16c..d9206cc 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -1,12 +1,12 @@ use std::{collections::HashSet, path::Path}; -use chrono::Utc; +use chrono::{DateTime, Utc}; use jid::JID; use sqlx::{SqlitePool, migrate}; use uuid::Uuid; use crate::{ - chat::{Chat, Message}, + chat::{Body, Chat, Delivery, Message}, error::{DatabaseError as Error, DatabaseOpenError}, presence::Online, roster::Contact, @@ -51,10 +51,9 @@ impl Db { pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> { sqlx::query!( - "insert into users ( jid, nick, cached_status_message ) values ( ?, ?, ? )", + "insert into users ( jid, nick ) values ( ?, ? )", user.jid, user.nick, - user.cached_status_message ) .execute(&self.db) .await?; @@ -75,22 +74,116 @@ impl Db { Ok(user) } - pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<(), Error> { - sqlx::query!( - "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?", + /// returns whether or not the nickname was updated + pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { + if sqlx::query!( + "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?", + jid, + None::<String>, + None::<String>, + None::<String>, + ) + .execute(&self.db) + .await? + .rows_affected() + > 0 + { + Ok(true) + } else { + Ok(false) + } + } + + /// returns whether or not the nickname was updated + pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { + let rows_affected = sqlx::query!( + "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?", jid, nick, + nick, nick ) .execute(&self.db) - .await?; - Ok(()) + .await? + .rows_affected(); + tracing::debug!("rows affected: {}", rows_affected); + if rows_affected > 0 { + Ok(true) + } else { + Ok(false) + } + } + + /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar + pub(crate) async fn delete_user_avatar( + &self, + jid: JID, + ) -> Result<(bool, Option<String>), Error> { + #[derive(sqlx::FromRow)] + struct AvatarRow { + avatar: Option<String>, + } + let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") + .bind(jid.clone()) + .fetch_optional(&self.db) + .await? + .map(|row: AvatarRow| row.avatar) + .unwrap_or(None); + if sqlx::query!( + "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?", + jid, + None::<String>, + None::<String>, + None::<String>, + ) + .execute(&self.db) + .await? + .rows_affected() + > 0 + { + Ok((true, old_avatar)) + } else { + Ok((false, old_avatar)) + } + } + + /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar + pub(crate) async fn upsert_user_avatar( + &self, + jid: JID, + avatar: String, + ) -> Result<(bool, Option<String>), Error> { + #[derive(sqlx::FromRow)] + struct AvatarRow { + avatar: Option<String>, + } + let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") + .bind(jid.clone()) + .fetch_optional(&self.db) + .await? + .map(|row: AvatarRow| row.avatar) + .unwrap_or(None); + if sqlx::query!( + "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?", + jid, + avatar, + avatar, + avatar, + ) + .execute(&self.db) + .await? + .rows_affected() + > 0 + { + Ok((true, old_avatar)) + } else { + Ok((false, old_avatar)) + } } pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { sqlx::query!( - "update users set cached_status_message = ?, nick = ? where jid = ?", - user.cached_status_message, + "update users set nick = ? where jid = ?", user.nick, user.jid ) @@ -266,10 +359,9 @@ impl Db { } pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { - let mut roster: Vec<Contact> = - sqlx::query_as("select * from roster join users on jid = user_jid") - .fetch_all(&self.db) - .await?; + let mut roster: Vec<Contact> = sqlx::query_as("select * from roster") + .fetch_all(&self.db) + .await?; for contact in &mut roster { #[derive(sqlx::FromRow)] struct Row { @@ -285,13 +377,47 @@ impl Db { Ok(roster) } + pub(crate) async fn read_cached_roster_with_users( + &self, + ) -> Result<Vec<(Contact, User)>, Error> { + #[derive(sqlx::FromRow)] + struct Row { + #[sqlx(flatten)] + contact: Contact, + #[sqlx(flatten)] + user: User, + } + let mut roster: Vec<Row> = + sqlx::query_as("select * from roster join users on jid = user_jid") + .fetch_all(&self.db) + .await?; + for row in &mut roster { + #[derive(sqlx::FromRow)] + struct Row { + group_name: String, + } + let groups: Vec<Row> = + sqlx::query_as("select group_name from groups_roster where contact_jid = ?") + .bind(&row.contact.user_jid) + .fetch_all(&self.db) + .await?; + row.contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); + } + let roster = roster + .into_iter() + .map(|row| (row.contact, row.user)) + .collect(); + Ok(roster) + } + pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> { let id = Uuid::new_v4(); let jid = chat.correspondent(); sqlx::query!( - "insert into chats (id, correspondent) values (?, ?)", + "insert into chats (id, correspondent, have_chatted) values (?, ?, ?)", id, - jid + jid, + chat.have_chatted, ) .execute(&self.db) .await?; @@ -371,21 +497,197 @@ impl Db { &self, ) -> Result<Vec<(Chat, Message)>, Error> { #[derive(sqlx::FromRow)] + pub struct RowChat { + chat_correspondent: JID, + chat_have_chatted: bool, + } + impl From<RowChat> for Chat { + fn from(value: RowChat) -> Self { + Self { + correspondent: value.chat_correspondent, + have_chatted: value.chat_have_chatted, + } + } + } + #[derive(sqlx::FromRow)] + pub struct RowMessage { + message_id: Uuid, + message_body: String, + message_delivery: Option<Delivery>, + message_timestamp: DateTime<Utc>, + message_from_jid: JID, + } + impl From<RowMessage> for Message { + fn from(value: RowMessage) -> Self { + Self { + id: value.message_id, + from: value.message_from_jid, + delivery: value.message_delivery, + timestamp: value.message_timestamp, + body: Body { + body: value.message_body, + }, + } + } + } + + #[derive(sqlx::FromRow)] + pub struct ChatWithMessageRow { + #[sqlx(flatten)] + pub chat: RowChat, + #[sqlx(flatten)] + pub message: RowMessage, + } + pub struct ChatWithMessage { + chat: Chat, + message: Message, + } + + impl From<ChatWithMessageRow> for ChatWithMessage { + fn from(value: ChatWithMessageRow) -> Self { + Self { + chat: value.chat.into(), + message: value.message.into(), + } + } + } + + // TODO: sometimes chats have no messages. + let chats: Vec<ChatWithMessageRow> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc") + .fetch_all(&self.db) + .await?; + + let chats = chats + .into_iter() + .map(|chat_with_message_row| { + let chat_with_message: ChatWithMessage = chat_with_message_row.into(); + (chat_with_message.chat, chat_with_message.message) + }) + .collect(); + + Ok(chats) + } + + /// chats ordered by date of last message + // greatest-n-per-group + pub(crate) async fn read_chats_ordered_with_latest_messages_and_users( + &self, + ) -> Result<Vec<((Chat, User), (Message, User))>, Error> { + #[derive(sqlx::FromRow)] + pub struct RowChat { + chat_correspondent: JID, + chat_have_chatted: bool, + } + impl From<RowChat> for Chat { + fn from(value: RowChat) -> Self { + Self { + correspondent: value.chat_correspondent, + have_chatted: value.chat_have_chatted, + } + } + } + #[derive(sqlx::FromRow)] + pub struct RowMessage { + message_id: Uuid, + message_body: String, + message_delivery: Option<Delivery>, + message_timestamp: DateTime<Utc>, + message_from_jid: JID, + } + impl From<RowMessage> for Message { + fn from(value: RowMessage) -> Self { + Self { + id: value.message_id, + from: value.message_from_jid, + delivery: value.message_delivery, + timestamp: value.message_timestamp, + body: Body { + body: value.message_body, + }, + } + } + } + #[derive(sqlx::FromRow)] + pub struct RowChatUser { + chat_user_jid: JID, + chat_user_nick: Option<String>, + chat_user_avatar: Option<String>, + } + impl From<RowChatUser> for User { + fn from(value: RowChatUser) -> Self { + Self { + jid: value.chat_user_jid, + nick: value.chat_user_nick, + avatar: value.chat_user_avatar, + } + } + } + #[derive(sqlx::FromRow)] + pub struct RowMessageUser { + message_user_jid: JID, + message_user_nick: Option<String>, + message_user_avatar: Option<String>, + } + impl From<RowMessageUser> for User { + fn from(value: RowMessageUser) -> Self { + Self { + jid: value.message_user_jid, + nick: value.message_user_nick, + avatar: value.message_user_avatar, + } + } + } + #[derive(sqlx::FromRow)] + pub struct ChatWithMessageAndUsersRow { + #[sqlx(flatten)] + pub chat: RowChat, #[sqlx(flatten)] - pub chat: Chat, + pub chat_user: RowChatUser, #[sqlx(flatten)] - pub message: Message, + pub message: RowMessage, + #[sqlx(flatten)] + pub message_user: RowMessageUser, + } + + impl From<ChatWithMessageAndUsersRow> for ChatWithMessageAndUsers { + fn from(value: ChatWithMessageAndUsersRow) -> Self { + Self { + chat: value.chat.into(), + chat_user: value.chat_user.into(), + message: value.message.into(), + message_user: value.message_user.into(), + } + } } - // TODO: i don't know if this will assign the right uuid to the latest message or the chat's id. should probably check but i don't think it matters as nothing ever gets called with the id of the latest message in the chats list - let chats: Vec<ChatWithMessage> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc") + pub struct ChatWithMessageAndUsers { + chat: Chat, + chat_user: User, + message: Message, + message_user: User, + } + + let chats: Vec<ChatWithMessageAndUsersRow> = sqlx::query_as("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc") .fetch_all(&self.db) .await?; let chats = chats .into_iter() - .map(|chat_with_message| (chat_with_message.chat, chat_with_message.message)) + .map(|chat_with_message_and_users_row| { + let chat_with_message_and_users: ChatWithMessageAndUsers = + chat_with_message_and_users_row.into(); + ( + ( + chat_with_message_and_users.chat, + chat_with_message_and_users.chat_user, + ), + ( + chat_with_message_and_users.message, + chat_with_message_and_users.message_user, + ), + ) + }) .collect(); Ok(chats) @@ -441,12 +743,14 @@ impl Db { .execute(&self.db) .await?; let id = Uuid::new_v4(); - let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing returning *") + let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?") .bind(id) - .bind(bare_chat) + .bind(bare_chat.clone()) .bind(false) + .bind(bare_chat) .fetch_one(&self.db) .await?; + tracing::debug!("CHECKING chat: {:?}", chat); Ok(chat.have_chatted) } @@ -472,7 +776,7 @@ impl Db { Ok(()) } - // create direct message from incoming + /// create direct message from incoming. MUST upsert chat and user pub(crate) async fn create_message_with_user_resource( &self, message: Message, @@ -482,20 +786,20 @@ impl Db { ) -> Result<(), Error> { let bare_chat = chat.as_bare(); let resource = &chat.resourcepart; - sqlx::query!( - "insert into users (jid) values (?) on conflict do nothing", - bare_chat - ) - .execute(&self.db) - .await?; - let id = Uuid::new_v4(); - sqlx::query!( - "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", - id, - bare_chat - ) - .execute(&self.db) - .await?; + // sqlx::query!( + // "insert into users (jid) values (?) on conflict do nothing", + // bare_chat + // ) + // .execute(&self.db) + // .await?; + // let id = Uuid::new_v4(); + // sqlx::query!( + // "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", + // id, + // bare_chat + // ) + // .execute(&self.db) + // .await?; if let Some(resource) = resource { sqlx::query!( "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", @@ -537,6 +841,30 @@ impl Db { Ok(messages) } + pub(crate) async fn read_message_history_with_users( + &self, + chat: JID, + ) -> Result<Vec<(Message, User)>, Error> { + let chat_id = self.read_chat_id(chat).await?; + #[derive(sqlx::FromRow)] + pub struct Row { + #[sqlx(flatten)] + user: User, + #[sqlx(flatten)] + message: Message, + } + let messages: Vec<Row> = + sqlx::query_as("select * from messages join users on jid = from_jid where chat_id = ? order by timestamp asc") + .bind(chat_id) + .fetch_all(&self.db) + .await?; + let messages = messages + .into_iter() + .map(|row| (row.message, row.user)) + .collect(); + Ok(messages) + } + pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> { let online: Online = sqlx::query_as("select * from cached_status where id = 0") .fetch_one(&self.db) diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 5111413..f2bf6ef 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -1,15 +1,19 @@ -use std::{string::FromUtf8Error, sync::Arc}; +use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc}; +use base64::DecodeError; +use image::ImageError; use jid::JID; -use lampada::error::{ConnectionError, ReadError, WriteError}; +use lampada::error::{ActorError, ConnectionError, ReadError, WriteError}; use stanza::client::{Stanza, iq::Query}; use thiserror::Error; pub use lampada::error::CommandError; +use crate::files::FileStore; + // for the client logic impl #[derive(Debug, Error, Clone)] -pub enum Error { +pub enum Error<Fs: FileStore> { #[error("core error: {0}")] Connection(#[from] ConnectionError), #[error("received unrecognized/unsupported content")] @@ -17,7 +21,7 @@ pub enum Error { // TODO: include content // UnrecognizedContent(peanuts::element::Content), #[error("iq receive error: {0}")] - Iq(#[from] IqError), + Iq(#[from] IqProcessError), // TODO: change to Connecting(ConnectingError) #[error("connecting: {0}")] Connecting(#[from] ConnectionJobError), @@ -33,11 +37,11 @@ pub enum Error { #[error("message send error: {0}")] MessageSend(#[from] MessageSendError), #[error("message receive error: {0}")] - MessageRecv(#[from] MessageRecvError), + MessageRecv(#[from] MessageRecvError<Fs>), #[error("subscripbe error: {0}")] Subscribe(#[from] SubscribeError), #[error("publish error: {0}")] - Publish(#[from] PublishError), + Publish(#[from] PEPError), } #[derive(Debug, Error, Clone)] @@ -53,13 +57,29 @@ pub enum MessageSendError { } #[derive(Debug, Error, Clone)] -pub enum MessageRecvError { +pub enum MessageRecvError<Fs: FileStore> { #[error("could not add to message history: {0}")] MessageHistory(#[from] DatabaseError), #[error("missing from")] MissingFrom, #[error("could not update user nick: {0}")] NickUpdate(DatabaseError), + #[error("could not update user avatar: {0}")] + AvatarUpdate(#[from] AvatarUpdateError<Fs>), +} + +#[derive(Debug, Error, Clone)] +pub enum AvatarUpdateError<Fs: FileStore> { + #[error("could not save to disk: {0}")] + FileStore(Fs::Err), + #[error("could not fetch avatar data: {0}")] + PEPError(#[from] CommandError<PEPError>), + #[error("base64 decode: {0}")] + Base64(#[from] DecodeError), + #[error("pep node missing avatar data")] + MissingData, + #[error("database: {0}")] + Database(#[from] DatabaseError), } #[derive(Debug, Error, Clone)] @@ -97,6 +117,17 @@ pub enum RosterError { StanzaError(#[from] stanza::client::error::Error), #[error("could not reply to roster push: {0}")] PushReply(WriteError), + #[error("actor error: {0}")] + Actor(ActorError), +} + +impl From<CommandError<RosterError>> for RosterError { + fn from(value: CommandError<RosterError>) -> Self { + match value { + CommandError::Actor(actor_error) => Self::Actor(actor_error), + CommandError::Error(e) => e, + } + } } #[derive(Debug, Error, Clone)] @@ -161,6 +192,14 @@ pub enum IqError { } #[derive(Debug, Error, Clone)] +pub enum IqProcessError { + #[error("iq error")] + Iq(#[from] IqError), + #[error("roster push")] + Roster(#[from] RosterError), +} + +#[derive(Debug, Error, Clone)] pub enum DatabaseOpenError { #[error("error: {0}")] Error(Arc<sqlx::Error>), @@ -203,7 +242,7 @@ pub enum PresenceError { } #[derive(Debug, Error, Clone)] -pub enum PublishError { +pub enum PEPError { #[error("received mismatched query")] MismatchedQuery(Query), #[error("missing query")] @@ -216,12 +255,19 @@ pub enum PublishError { UnexpectedStanza(Stanza), #[error("iq response: {0}")] IqResponse(#[from] IqRequestError), + #[error("missing pep item")] + MissingItem, + #[error("incorrect item id: expected {0}, got {1}")] + IncorrectItemID(String, String), + #[error("unsupported pep item")] + UnsupportedItem, + // TODO: should the item be in the error? } #[derive(Debug, Error, Clone)] pub enum NickError { #[error("publishing nick: {0}")] - Publish(#[from] CommandError<PublishError>), + Publish(#[from] CommandError<PEPError>), #[error("updating database: {0}")] Database(#[from] DatabaseError), #[error("disconnected")] @@ -267,5 +313,31 @@ pub enum CapsNodeConversionError { #[error("missing hashtag")] MissingHashtag, } -// #[derive(Debug, Error, Clone)] -// pub enum CapsError {} + +#[derive(Debug, Error, Clone)] +pub enum AvatarPublishError<Fs: FileStore> { + #[error("disconnected")] + Disconnected, + #[error("image read: {0}")] + Read(Arc<std::io::Error>), + #[error("image: {0}")] + Image(Arc<ImageError>), + #[error("pep publish: {0}")] + Publish(#[from] CommandError<PEPError>), + #[error("bytes number conversion: {0}")] + FromInt(#[from] TryFromIntError), + #[error("could not save to disk")] + FileStore(Fs::Err), +} + +impl<Fs: FileStore> From<std::io::Error> for AvatarPublishError<Fs> { + fn from(value: std::io::Error) -> Self { + Self::Read(Arc::new(value)) + } +} + +impl<Fs: FileStore> From<ImageError> for AvatarPublishError<Fs> { + fn from(value: ImageError) -> Self { + Self::Image(Arc::new(value)) + } +} diff --git a/filamento/src/files.rs b/filamento/src/files.rs new file mode 100644 index 0000000..3acc871 --- /dev/null +++ b/filamento/src/files.rs @@ -0,0 +1,77 @@ +use std::{ + error::Error, + path::{Path, PathBuf}, + sync::Arc, +}; + +use tokio::io; + +pub trait FileStore { + type Err: Clone + Send + Error; + + fn is_stored( + &self, + name: &str, + ) -> impl std::future::Future<Output = Result<bool, Self::Err>> + std::marker::Send; + fn store( + &self, + name: &str, + data: &[u8], + ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send; + fn delete( + &self, + name: &str, + ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send; +} + +#[derive(Clone, Debug)] +pub struct Files { + root: PathBuf, +} + +impl Files { + pub fn new(root: impl AsRef<Path>) -> Self { + let root = root.as_ref(); + let root = root.into(); + Self { root } + } + + pub fn root(&self) -> &Path { + &self.root + } +} + +impl FileStore for Files { + type Err = Arc<io::Error>; + + async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> { + tracing::debug!("checking if {} is stored", name); + // TODO: is this secure ;-; + let name = name.replace("/", "").replace(".", ""); + let res = tokio::fs::try_exists(self.root.join(name)) + .await + .map_err(|err| Arc::new(err)); + tracing::debug!("file check res: {:?}", res); + res + } + + async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> { + tracing::debug!("storing {} is stored", name); + let name = name.replace("/", "").replace(".", ""); + let res = tokio::fs::write(self.root.join(name), data) + .await + .map_err(|err| Arc::new(err)); + tracing::debug!("file store res: {:?}", res); + res + } + + async fn delete(&self, name: &str) -> Result<(), Self::Err> { + tracing::debug!("deleting {}", name); + let name = name.replace("/", "").replace(".", ""); + let res = tokio::fs::remove_file(self.root.join(name)) + .await + .map_err(|err| Arc::new(err)); + tracing::debug!("file delete res: {:?}", res); + res + } +} diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index c44edca..14b0cae 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -11,9 +11,10 @@ use chrono::Utc; use db::Db; use disco::{Info, Items}; use error::{ - ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, NickError, - PresenceError, PublishError, RosterError, StatusError, SubscribeError, + AvatarPublishError, ConnectionJobError, DatabaseError, DiscoError, Error, IqError, + MessageRecvError, NickError, PEPError, PresenceError, RosterError, StatusError, SubscribeError, }; +use files::FileStore; use futures::FutureExt; use jid::JID; use lampada::{ @@ -35,20 +36,24 @@ use tracing::{debug, info}; use user::User; use uuid::Uuid; +pub mod avatar; pub mod caps; pub mod chat; pub mod db; pub mod disco; pub mod error; +pub mod files; mod logic; pub mod pep; pub mod presence; pub mod roster; pub mod user; -pub enum Command { +pub enum Command<Fs: FileStore> { /// get the roster. if offline, retreive cached version from database. should be stored in application memory GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>), + /// get the roster. if offline, retreive cached version from database. should be stored in application memory. includes user associated with each contact + GetRosterWithUsers(oneshot::Sender<Result<Vec<(Contact, User)>, RosterError>>), /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) // TODO: paging and filtering GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), @@ -56,11 +61,21 @@ pub enum Command { GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>), // TODO: paging and filtering GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>), + // TODO: paging and filtering, nullabillity for latest message + GetChatsOrderedWithLatestMessagesAndUsers( + oneshot::Sender<Result<Vec<((Chat, User), (Message, User))>, DatabaseError>>, + ), /// get a specific chat by jid GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>), /// get message history for chat (does appropriate mam things) // TODO: paging and filtering GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>), + /// get message history for chat (does appropriate mam things) + // TODO: paging and filtering + GetMessagesWithUsers( + JID, + oneshot::Sender<Result<Vec<(Message, User)>, DatabaseError>>, + ), /// delete a chat from your chat history, along with all the corresponding messages DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>), /// delete a message from your chat history @@ -115,27 +130,42 @@ pub enum Command { Option<String>, oneshot::Sender<Result<disco::Items, DiscoError>>, ), - /// publish item to a pep node, specified or default according to item. - Publish { + /// publish item to a pep node specified. + PublishPEPItem { item: pep::Item, node: String, - sender: oneshot::Sender<Result<(), PublishError>>, + sender: oneshot::Sender<Result<(), PEPError>>, }, - /// change user nickname - ChangeNick(String, oneshot::Sender<Result<(), NickError>>), + DeletePEPNode { + node: String, + sender: oneshot::Sender<Result<(), PEPError>>, + }, + GetPEPItem { + jid: Option<JID>, + node: String, + id: String, + sender: oneshot::Sender<Result<pep::Item, PEPError>>, + }, + /// change client user nickname + ChangeNick(Option<String>, oneshot::Sender<Result<(), NickError>>), + // // TODO + // GetNick(...), + // GetAvatar(...) // /// get capability node // GetCaps(String, oneshot::Sender<Result<Info, CapsError>>), + /// change client user avatar + ChangeAvatar( + Option<Vec<u8>>, + oneshot::Sender<Result<(), AvatarPublishError<Fs>>>, + ), } #[derive(Debug, Clone)] pub enum UpdateMessage { - Online(Online, Vec<Contact>), + Online(Online, Vec<(Contact, User)>), Offline(Offline), - /// received roster from jabber server (replace full app roster state with this) - /// is this needed? - FullRoster(Vec<Contact>), /// (only update app roster state, don't replace) - RosterUpdate(Contact), + RosterUpdate(Contact, User), RosterDelete(JID), /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone Presence { @@ -146,27 +176,42 @@ pub enum UpdateMessage { // MessageDispatched(Uuid), Message { to: JID, + from: User, message: Message, }, MessageDelivery { id: Uuid, + chat: JID, delivery: Delivery, }, SubscriptionRequest(jid::JID), NickChanged { jid: JID, - nick: String, + nick: Option<String>, + }, + AvatarChanged { + jid: JID, + id: Option<String>, }, } /// an xmpp client that is suited for a chat client use case #[derive(Debug)] -pub struct Client { - sender: mpsc::Sender<CoreClientCommand<Command>>, +pub struct Client<Fs: FileStore> { + sender: mpsc::Sender<CoreClientCommand<Command<Fs>>>, timeout: Duration, } -impl Clone for Client { +impl<Fs: FileStore> Client<Fs> { + pub fn with_timeout(&self, timeout: Duration) -> Self { + Self { + sender: self.sender.clone(), + timeout, + } + } +} + +impl<Fs: FileStore> Clone for Client<Fs> { fn clone(&self) -> Self { Self { sender: self.sender.clone(), @@ -175,32 +220,27 @@ impl Clone for Client { } } -impl Deref for Client { - type Target = mpsc::Sender<CoreClientCommand<Command>>; +impl<Fs: FileStore> Deref for Client<Fs> { + type Target = mpsc::Sender<CoreClientCommand<Command<Fs>>>; fn deref(&self) -> &Self::Target { &self.sender } } -impl DerefMut for Client { +impl<Fs: FileStore> DerefMut for Client<Fs> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.sender } } -impl Client { - pub async fn connect(&self) -> Result<(), ActorError> { - self.send(CoreClientCommand::Connect).await?; - Ok(()) - } - - pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { - self.send(CoreClientCommand::Disconnect).await?; - Ok(()) - } - - pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) { +impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> { + pub fn new( + jid: JID, + password: String, + db: Db, + file_store: Fs, + ) -> (Self, mpsc::Receiver<UpdateMessage>) { let (command_sender, command_receiver) = mpsc::channel(20); let (update_send, update_recv) = mpsc::channel(20); @@ -214,14 +254,26 @@ impl Client { timeout: Duration::from_secs(10), }; - let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send); + let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send, file_store); - let actor: CoreClient<ClientLogic> = + let actor: CoreClient<ClientLogic<Fs>> = CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); tokio::spawn(async move { actor.run().await }); (client, update_recv) } +} + +impl<Fs: FileStore> Client<Fs> { + pub async fn connect(&self) -> Result<(), ActorError> { + self.send(CoreClientCommand::Connect).await?; + Ok(()) + } + + pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { + self.send(CoreClientCommand::Disconnect).await?; + Ok(()) + } pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { let (send, recv) = oneshot::channel(); @@ -235,6 +287,22 @@ impl Client { Ok(roster) } + pub async fn get_roster_with_users( + &self, + ) -> Result<Vec<(Contact, User)>, CommandError<RosterError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetRosterWithUsers( + send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let roster = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(roster) + } + pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetChats(send))) @@ -275,6 +343,22 @@ impl Client { Ok(chats) } + pub async fn get_chats_ordered_with_latest_messages_and_users( + &self, + ) -> Result<Vec<((Chat, User), (Message, User))>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command( + Command::GetChatsOrderedWithLatestMessagesAndUsers(send), + )) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let chats = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(chats) + } + pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::GetChat(jid, send))) @@ -302,6 +386,23 @@ impl Client { Ok(messages) } + pub async fn get_messages_with_users( + &self, + jid: JID, + ) -> Result<Vec<(Message, User)>, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetMessagesWithUsers( + jid, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let messages = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(messages) + } + pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send))) @@ -539,9 +640,9 @@ impl Client { &self, item: pep::Item, node: String, - ) -> Result<(), CommandError<PublishError>> { + ) -> Result<(), CommandError<PEPError>> { let (send, recv) = oneshot::channel(); - self.send(CoreClientCommand::Command(Command::Publish { + self.send(CoreClientCommand::Command(Command::PublishPEPItem { item, node, sender: send, @@ -555,7 +656,44 @@ impl Client { Ok(result) } - pub async fn change_nick(&self, nick: String) -> Result<(), CommandError<NickError>> { + pub async fn delete_pep_node(&self, node: String) -> Result<(), CommandError<PEPError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::DeletePEPNode { + node, + sender: send, + })) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn get_pep_item( + &self, + jid: Option<JID>, + node: String, + id: String, + ) -> Result<pep::Item, CommandError<PEPError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetPEPItem { + jid, + node, + id, + sender: send, + })) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn change_nick(&self, nick: Option<String>) -> Result<(), CommandError<NickError>> { let (send, recv) = oneshot::channel(); self.send(CoreClientCommand::Command(Command::ChangeNick(nick, send))) .await @@ -566,10 +704,27 @@ impl Client { .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; Ok(result) } + + pub async fn change_avatar( + &self, + avatar: Option<Vec<u8>>, + ) -> Result<(), CommandError<AvatarPublishError<Fs>>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::ChangeAvatar( + avatar, send, + ))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } } -impl From<Command> for CoreClientCommand<Command> { - fn from(value: Command) -> Self { +impl<Fs: FileStore> From<Command<Fs>> for CoreClientCommand<Command<Fs>> { + fn from(value: Command<Fs>) -> Self { CoreClientCommand::Command(value) } } diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs index df82655..3588b13 100644 --- a/filamento/src/logic/abort.rs +++ b/filamento/src/logic/abort.rs @@ -1,7 +1,9 @@ use lampada::error::ReadError; +use crate::files::FileStore; + use super::ClientLogic; -pub async fn on_abort(logic: ClientLogic) { +pub async fn on_abort<Fs: FileStore + Clone>(logic: ClientLogic<Fs>) { logic.pending().drain().await; } diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs index dc05448..9d61ca4 100644 --- a/filamento/src/logic/connect.rs +++ b/filamento/src/logic/connect.rs @@ -5,17 +5,21 @@ use tracing::debug; use crate::{ Command, UpdateMessage, error::{ConnectionJobError, Error, RosterError}, + files::FileStore, presence::{Online, PresenceType}, }; use super::ClientLogic; -pub async fn handle_connect(logic: ClientLogic, connection: Connected) { +pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>( + logic: ClientLogic<Fs>, + connection: Connected, +) { let (send, recv) = oneshot::channel(); debug!("getting roster"); logic .clone() - .handle_online(Command::GetRoster(send), connection.clone()) + .handle_online(Command::GetRosterWithUsers(send), connection.clone()) .await; debug!("sent roster req"); let roster = recv.await; diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs index 081900b..36c1cef 100644 --- a/filamento/src/logic/connection_error.rs +++ b/filamento/src/logic/connection_error.rs @@ -1,7 +1,12 @@ use lampada::error::ConnectionError; +use crate::files::FileStore; + use super::ClientLogic; -pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { +pub async fn handle_connection_error<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + error: ConnectionError, +) { logic.handle_error(error.into()).await; } diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs index 241c3e6..ebcfd4f 100644 --- a/filamento/src/logic/disconnect.rs +++ b/filamento/src/logic/disconnect.rs @@ -1,11 +1,14 @@ use lampada::Connected; use stanza::client::Stanza; -use crate::{UpdateMessage, presence::Offline}; +use crate::{UpdateMessage, files::FileStore, presence::Offline}; use super::ClientLogic; -pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) { +pub async fn handle_disconnect<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + connection: Connected, +) { // TODO: be able to set offline status message let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None); let stanza = Stanza::Presence(offline_presence); diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs index 3f6fe8d..dc94d2c 100644 --- a/filamento/src/logic/local_only.rs +++ b/filamento/src/logic/local_only.rs @@ -4,44 +4,77 @@ use uuid::Uuid; use crate::{ chat::{Chat, Message}, error::DatabaseError, + files::FileStore, user::User, }; use super::ClientLogic; -pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> { Ok(logic.db().read_chats().await?) } -pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { +pub async fn handle_get_chats_ordered<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Chat>, DatabaseError> { Ok(logic.db().read_chats_ordered().await?) } -pub async fn handle_get_chats_ordered_with_latest_messages( - logic: &ClientLogic, +pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, ) -> Result<Vec<(Chat, Message)>, DatabaseError> { Ok(logic.db().read_chats_ordered_with_latest_messages().await?) } -pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> { +pub async fn handle_get_chats_ordered_with_latest_messages_and_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<((Chat, User), (Message, User))>, DatabaseError> { + Ok(logic + .db() + .read_chats_ordered_with_latest_messages_and_users() + .await?) +} + +pub async fn handle_get_chat<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<Chat, DatabaseError> { Ok(logic.db().read_chat(jid).await?) } -pub async fn handle_get_messages( - logic: &ClientLogic, +pub async fn handle_get_messages<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, jid: JID, ) -> Result<Vec<Message>, DatabaseError> { Ok(logic.db().read_message_history(jid).await?) } -pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> { +pub async fn handle_get_messages_with_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<Vec<(Message, User)>, DatabaseError> { + Ok(logic.db().read_message_history_with_users(jid).await?) +} + +pub async fn handle_delete_chat<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<(), DatabaseError> { Ok(logic.db().delete_chat(jid).await?) } -pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> { +pub async fn handle_delete_messaage<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + uuid: Uuid, +) -> Result<(), DatabaseError> { Ok(logic.db().delete_message(uuid).await?) } -pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> { +pub async fn handle_get_user<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<User, DatabaseError> { Ok(logic.db().read_user(jid).await?) } diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 1ddd7d3..5e05dac 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -4,12 +4,13 @@ use jid::JID; use lampada::{Connected, Logic, error::ReadError}; use stanza::client::Stanza; use tokio::sync::{Mutex, mpsc, oneshot}; -use tracing::{error, info, warn}; +use tracing::{error, info}; use crate::{ Client, Command, UpdateMessage, db::Db, error::{Error, IqRequestError, ResponseError}, + files::FileStore, }; mod abort; @@ -22,12 +23,13 @@ mod online; mod process_stanza; #[derive(Clone)] -pub struct ClientLogic { - client: Client, +pub struct ClientLogic<Fs: FileStore> { + client: Client<Fs>, bare_jid: JID, db: Db, pending: Pending, update_sender: mpsc::Sender<UpdateMessage>, + file_store: Fs, } #[derive(Clone)] @@ -75,12 +77,13 @@ impl Pending { } } -impl ClientLogic { +impl<Fs: FileStore> ClientLogic<Fs> { pub fn new( - client: Client, + client: Client<Fs>, bare_jid: JID, db: Db, update_sender: mpsc::Sender<UpdateMessage>, + file_store: Fs, ) -> Self { Self { db, @@ -88,10 +91,11 @@ impl ClientLogic { update_sender, client, bare_jid, + file_store, } } - pub fn client(&self) -> &Client { + pub fn client(&self) -> &Client<Fs> { &self.client } @@ -103,6 +107,10 @@ impl ClientLogic { &self.pending } + pub fn file_store(&self) -> &Fs { + &self.file_store + } + pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { &self.update_sender } @@ -113,13 +121,14 @@ impl ClientLogic { self.update_sender().send(update).await; } - pub async fn handle_error(&self, e: Error) { + // TODO: delete this + pub async fn handle_error(&self, e: Error<Fs>) { error!("{}", e); } } -impl Logic for ClientLogic { - type Cmd = Command; +impl<Fs: FileStore + Clone + Send + Sync> Logic for ClientLogic<Fs> { + type Cmd = Command<Fs>; // pub async fn handle_stream_error(self, error) {} // stanza errors (recoverable) diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index 6399cf7..b87484c 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -1,16 +1,21 @@ +use std::process::id; + use chrono::Utc; use lampada::error::WriteError; +use tracing::error; use uuid::Uuid; use crate::{ Command, chat::{Delivery, Message}, error::{ - DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError, - StatusError, + AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, + NickError, PEPError, RosterError, StatusError, }, + files::FileStore, presence::Online, roster::Contact, + user::User, }; use super::{ @@ -18,11 +23,12 @@ use super::{ local_only::{ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, - handle_get_messages, handle_get_user, + handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, + handle_get_messages_with_users, handle_get_user, }, }; -pub async fn handle_offline(logic: ClientLogic, command: Command) { +pub async fn handle_offline<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>) { let result = handle_offline_result(&logic, command).await; match result { Ok(_) => {} @@ -30,21 +36,39 @@ pub async fn handle_offline(logic: ClientLogic, command: Command) { } } -pub async fn handle_set_status(logic: &ClientLogic, online: Online) -> Result<(), StatusError> { +pub async fn handle_set_status<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + online: Online, +) -> Result<(), StatusError> { logic.db().upsert_cached_status(online).await?; Ok(()) } -pub async fn handle_get_roster(logic: &ClientLogic) -> Result<Vec<Contact>, RosterError> { +pub async fn handle_get_roster<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<Contact>, RosterError> { Ok(logic.db().read_cached_roster().await?) } -pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Result<(), Error> { +pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, +) -> Result<Vec<(Contact, User)>, RosterError> { + Ok(logic.db().read_cached_roster_with_users().await?) +} + +pub async fn handle_offline_result<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + command: Command<Fs>, +) -> Result<(), Error<Fs>> { match command { Command::GetRoster(sender) => { let roster = handle_get_roster(logic).await; sender.send(roster); } + Command::GetRosterWithUsers(sender) => { + let roster = handle_get_roster_with_users(logic).await; + sender.send(roster); + } Command::GetChats(sender) => { let chats = handle_get_chats(logic).await; sender.send(chats); @@ -57,6 +81,10 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res let chats = handle_get_chats_ordered_with_latest_messages(logic).await; sender.send(chats); } + Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => { + let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await; + sender.send(chats); + } Command::GetChat(jid, sender) => { let chats = handle_get_chat(logic, jid).await; sender.send(chats); @@ -65,6 +93,10 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res let messages = handle_get_messages(logic, jid).await; sender.send(messages); } + Command::GetMessagesWithUsers(jid, sender) => { + let messages = handle_get_messages_with_users(logic, jid).await; + sender.send(messages); + } Command::DeleteChat(jid, sender) => { let result = handle_delete_chat(logic, jid).await; sender.send(result); @@ -77,7 +109,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res let user = handle_get_user(logic, jid).await; sender.send(user); } - // TODO: offline queue to modify roster Command::AddContact(_jid, sender) => { sender.send(Err(RosterError::Write(WriteError::Disconnected))); } @@ -112,7 +143,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res let result = handle_set_status(logic, online).await; sender.send(result); } - // TODO: offline message queue Command::SendMessage(jid, body) => { let id = Uuid::new_v4(); let timestamp = Utc::now(); @@ -142,11 +172,25 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res .handle_error(MessageSendError::MessageHistory(e.into()).into()) .await; } + + let from = match logic.db().read_user(logic.bare_jid.clone()).await { + Ok(u) => u, + Err(e) => { + error!("{}", e); + User { + jid: logic.bare_jid.clone(), + nick: None, + avatar: None, + } + } + }; + logic .update_sender() .send(crate::UpdateMessage::Message { to: jid.as_bare(), message, + from, }) .await; } @@ -159,7 +203,7 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res Command::DiscoItems(_jid, _node, sender) => { sender.send(Err(DiscoError::Write(WriteError::Disconnected))); } - Command::Publish { + Command::PublishPEPItem { item: _, node: _, sender, @@ -169,6 +213,24 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res Command::ChangeNick(_, sender) => { sender.send(Err(NickError::Disconnected)); } + Command::ChangeAvatar(_items, sender) => { + sender.send(Err(AvatarPublishError::Disconnected)); + } + Command::DeletePEPNode { node: _, sender } => { + sender.send(Err(PEPError::IqResponse(IqRequestError::Write( + WriteError::Disconnected, + )))); + } + Command::GetPEPItem { + node: _, + sender, + jid: _, + id: _, + } => { + sender.send(Err(PEPError::IqResponse(IqRequestError::Write( + WriteError::Disconnected, + )))); + } } Ok(()) } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index b069f59..767f923 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,35 +1,33 @@ +use std::{io::Cursor, time::Duration}; + +use base64::{prelude::BASE64_STANDARD, Engine}; use chrono::Utc; +use image::ImageReader; use jid::JID; use lampada::{Connected, WriteMessage, error::WriteError}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ iq::{self, Iq, IqType, Query}, Stanza - }, - xep_0030::{info, items}, - xep_0060::pubsub::{self, Pubsub}, - xep_0172::{self, Nick}, - xep_0203::Delay, + }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay }; use tokio::sync::oneshot; use tracing::{debug, error, info}; use uuid::Uuid; use crate::{ - chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ - DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError - }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage + avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ + AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError + }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage }; use super::{ - ClientLogic, local_only::{ - handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, - handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, - handle_get_messages, handle_get_user, - }, + handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user + }, ClientLogic }; -pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { +pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) { let result = handle_online_result(&logic, command, connection).await; match result { Ok(_) => {} @@ -37,8 +35,8 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con } } -pub async fn handle_get_roster( - logic: &ClientLogic, +pub async fn handle_get_roster<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, ) -> Result<Vec<Contact>, RosterError> { let iq_id = Uuid::new_v4().to_string(); @@ -96,8 +94,73 @@ pub async fn handle_get_roster( } } -pub async fn handle_add_contact( - logic: &ClientLogic, +// this can't query the client... otherwise there is a hold-up and the connection can't complete +pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + connection: Connected, +) -> Result<Vec<(Contact, User)>, RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, stanza, iq_id.clone()) + .await?; + // TODO: timeout + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { + logic + .handle_error(Error::Roster(RosterError::Cache(e.into()))) + .await; + }; + let mut users = Vec::new(); + for contact in &contacts { + let user = logic.db().read_user(contact.user_jid.clone()).await?; + users.push(user); + } + Ok(contacts.into_iter().zip(users).collect()) + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_add_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), RosterError> { @@ -154,8 +217,8 @@ pub async fn handle_add_contact( } } -pub async fn handle_buddy_request( - logic: &ClientLogic, +pub async fn handle_buddy_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -177,8 +240,8 @@ pub async fn handle_buddy_request( Ok(()) } -pub async fn handle_subscription_request( - logic: &ClientLogic, +pub async fn handle_subscription_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -195,8 +258,8 @@ pub async fn handle_subscription_request( Ok(()) } -pub async fn handle_accept_buddy_request( - logic: &ClientLogic, +pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -218,8 +281,8 @@ pub async fn handle_accept_buddy_request( Ok(()) } -pub async fn handle_accept_subscription_request( - logic: &ClientLogic, +pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), SubscribeError> { @@ -274,8 +337,8 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result< Ok(()) } -pub async fn handle_delete_contact( - logic: &ClientLogic, +pub async fn handle_delete_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, ) -> Result<(), RosterError> { @@ -333,8 +396,8 @@ pub async fn handle_delete_contact( } } -pub async fn handle_update_contact( - logic: &ClientLogic, +pub async fn handle_update_contact<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: JID, contact_update: ContactUpdate, @@ -398,8 +461,8 @@ pub async fn handle_update_contact( } } -pub async fn handle_set_status( - logic: &ClientLogic, +pub async fn handle_set_status<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, online: Online, ) -> Result<(), StatusError> { @@ -411,9 +474,18 @@ pub async fn handle_set_status( Ok(()) } -pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { +pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) { // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. - let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + let have_chatted = match logic.db().upsert_chat_and_user(&jid).await { + Ok(have_chatted) => { + have_chatted + }, + Err(e) => { + error!("{}", e); + false + }, + }; let nick; let mark_chat_as_chatted; @@ -460,12 +532,25 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid .await; } + let from = match logic.db().read_user(logic.bare_jid.clone()).await { + Ok(u) => u, + Err(e) => { + error!("{}", e); + User { + jid: logic.bare_jid.clone(), + nick: None, + avatar: None, + } + }, + }; + // tell the client a message is being sent logic .update_sender() .send(UpdateMessage::Message { to: jid.as_bare(), message, + from, }) .await; @@ -503,9 +588,11 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid .send(UpdateMessage::MessageDelivery { id, delivery: Delivery::Written, + chat: jid.clone(), }) .await; if mark_chat_as_chatted { + debug!("marking chat as chatted"); if let Err(e) = logic.db.mark_chat_as_chatted(jid).await { logic .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) @@ -519,6 +606,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid .send(UpdateMessage::MessageDelivery { id, delivery: Delivery::Failed, + chat: jid, }) .await; logic.handle_error(MessageSendError::Write(e).into()).await; @@ -540,8 +628,8 @@ pub async fn handle_send_presence( Ok(()) } -pub async fn handle_disco_info( - logic: &ClientLogic, +pub async fn handle_disco_info<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: Option<String>, @@ -611,8 +699,8 @@ pub async fn handle_disco_info( } } -pub async fn handle_disco_items( - logic: &ClientLogic, +pub async fn handle_disco_items<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: Option<String>, @@ -680,20 +768,60 @@ pub async fn handle_disco_items( } } -pub async fn handle_publish( - logic: &ClientLogic, +pub async fn handle_publish_pep_item<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, connection: Connected, item: pep::Item, node: String, -) -> Result<(), PublishError> { +) -> Result<(), PEPError> { let id = Uuid::new_v4().to_string(); let publish = match item { - pep::Item::Nick(n) => pubsub::Publish { - node, - items: vec![pubsub::Item { - item: Some(pubsub::Content::Nick(Nick(n))), - ..Default::default() - }], + pep::Item::Nick(n) => { + if let Some(n) = n { + pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick(n))), + ..Default::default() + }], + } + } else { + pubsub::Publish { + node, + items: vec![pubsub::Item { + item: Some(pubsub::Content::Nick(Nick("".to_string()))), + ..Default::default() + }] + } + } + }, + pep::Item::AvatarMetadata(metadata) => { + if let Some(metadata) = metadata { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })), + id: Some(metadata.hash), + ..Default::default() + }]} + } else { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })), + ..Default::default() + }]} + } + }, + pep::Item::AvatarData(data) => { + if let Some(data) = data { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))), + id: Some(data.hash), + ..Default::default() + }] } + } else { + pubsub::Publish { node, items: vec![pubsub::Item { + item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))), + ..Default::default() + }]} + } }, }; let request = Iq { @@ -726,43 +854,239 @@ pub async fn handle_publish( if let Some(query) = query { match query { Query::Pubsub(_) => Ok(()), - q => Err(PublishError::MismatchedQuery(q)), + q => Err(PEPError::MismatchedQuery(q)), + } + } else { + Err(PEPError::MissingQuery) + } + } + IqType::Error => { + Err(PEPError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().as_bare()), + )) + } + } + s => Err(PEPError::UnexpectedStanza(s)), + } +} + +pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> { + let stanza_id = Uuid::new_v4().to_string(); + let request = Iq { + from: Some(connection.jid().clone()), + id: stanza_id.clone(), + to: jid.clone(), + r#type: IqType::Get, + lang: None, + query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items { + max_items: None, + node, + subid: None, + items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }], + }))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), stanza_id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == jid || { + if jid == None { + from == Some(connection.jid().as_bare()) + } else { + false + } + } { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::Pubsub(Pubsub::Items(mut items)) => { + if let Some(item) = items.items.pop() { + if item.id == Some(id.clone()) { + match item.item.ok_or(PEPError::MissingItem)? { + pubsub::Content::Nick(nick) => { + if nick.0.is_empty() { + Ok(pep::Item::Nick(None)) + } else { + Ok(pep::Item::Nick(Some(nick.0))) + + } + }, + pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))), + pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))), + pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem), + } + } else { + Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string()))) + } + } else { + Err(PEPError::MissingItem) + } + }, + q => Err(PEPError::MismatchedQuery(q)), } } else { - Err(PublishError::MissingQuery) + Err(PEPError::MissingQuery) } } IqType::Error => { - Err(PublishError::StanzaErrors(errors)) + Err(PEPError::StanzaErrors(errors)) } _ => unreachable!(), } } else { - Err(PublishError::IncorrectEntity( + // TODO: include expected entity + Err(PEPError::IncorrectEntity( from.unwrap_or_else(|| connection.jid().as_bare()), )) } } - s => Err(PublishError::UnexpectedStanza(s)), + s => Err(PEPError::UnexpectedStanza(s)), } } -pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { +pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> { logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?; Ok(()) } +pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> { + match img_data { + // set avatar + Some(data) => { + // load the image data and guess the format + let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?; + + // convert the image to png; + let mut data_png = Vec::new(); + let image = image.resize(192, 192, image::imageops::FilterType::Nearest); + image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Jpeg)?; + + // calculate the length of the data in bytes. + let bytes = data_png.len().try_into()?; + + // calculate sha1 hash of the data + let mut sha1 = Sha1::new(); + sha1.update(&data_png); + let sha1_result = sha1.finalize(); + let hash = hex::encode(sha1_result); + + // encode the image data as base64 + let data_b64 = BASE64_STANDARD.encode(data_png.clone()); + + // publish the data to the data node + logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?; + + // publish the metadata to the metadata node + logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/jpeg".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?; + + // if everything went well, save the data to the disk. + + if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? { + logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))? + } + // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval. + // TODO: should the node be purged? + + Ok(()) + }, + // remove avatar + None => { + logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?; + logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?; + Ok(()) + }, + } +} + +pub async fn handle_delete_pep_node<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + connection: Connected, + node: String, +) -> Result<(), PEPError> { + let id = Uuid::new_v4().to_string(); + let request = Iq { + from: Some(connection.jid().clone()), + id: id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))), + errors: Vec::new(), + }; + match logic + .pending() + .request(&connection, Stanza::Iq(request), id) + .await? { + + Stanza::Iq(Iq { + from, + r#type, + query, + errors, + .. + // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. + }) if r#type == IqType::Result || r#type == IqType::Error => { + if from == None || + from == Some(connection.jid().as_bare()) + { + match r#type { + IqType::Result => { + if let Some(query) = query { + match query { + Query::PubsubOwner(_) => Ok(()), + q => Err(PEPError::MismatchedQuery(q)), + } + } else { + // Err(PEPError::MissingQuery) + Ok(()) + } + } + IqType::Error => { + Err(PEPError::StanzaErrors(errors)) + } + _ => unreachable!(), + } + } else { + Err(PEPError::IncorrectEntity( + from.unwrap_or_else(|| connection.jid().as_bare()), + )) + } + } + s => Err(PEPError::UnexpectedStanza(s)), + } +} + // TODO: could probably macro-ise? -pub async fn handle_online_result( - logic: &ClientLogic, - command: Command, +pub async fn handle_online_result<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + command: Command<Fs>, connection: Connected, -) -> Result<(), Error> { +) -> Result<(), Error<Fs>> { match command { Command::GetRoster(result_sender) => { let roster = handle_get_roster(logic, connection).await; let _ = result_sender.send(roster); } + Command::GetRosterWithUsers(result_sender) => { + let roster = handle_get_roster_with_users(logic, connection).await; + let _ = result_sender.send(roster); + } Command::GetChats(sender) => { let chats = handle_get_chats(logic).await; let _ = sender.send(chats); @@ -775,6 +1099,10 @@ pub async fn handle_online_result( let chats = handle_get_chats_ordered_with_latest_messages(logic).await; let _ = sender.send(chats); } + Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => { + let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await; + sender.send(chats); + } Command::GetChat(jid, sender) => { let chat = handle_get_chat(logic, jid).await; let _ = sender.send(chat); @@ -783,6 +1111,10 @@ pub async fn handle_online_result( let messages = handle_get_messages(logic, jid).await; let _ = sender.send(messages); } + Command::GetMessagesWithUsers(jid, sender) => { + let messages = handle_get_messages_with_users(logic, jid).await; + sender.send(messages); + } Command::DeleteChat(jid, sender) => { let result = handle_delete_chat(logic, jid).await; let _ = sender.send(result); @@ -854,14 +1186,26 @@ pub async fn handle_online_result( let result = handle_disco_items(logic, connection, jid, node).await; let _ = sender.send(result); } - Command::Publish { item, node, sender } => { - let result = handle_publish(logic, connection, item, node).await; + Command::PublishPEPItem { item, node, sender } => { + let result = handle_publish_pep_item(logic, connection, item, node).await; let _ = sender.send(result); } Command::ChangeNick(nick, sender) => { let result = handle_change_nick(logic, nick).await; let _ = sender.send(result); } + Command::ChangeAvatar(img_data, sender) => { + let result = handle_change_avatar(logic, img_data).await; + let _ = sender.send(result); + }, + Command::DeletePEPNode { node, sender } => { + let result = handle_delete_pep_node(logic, connection, node).await; + let _ = sender.send(result); + }, + Command::GetPEPItem { node, sender, jid, id } => { + let result = handle_get_pep_item(logic, connection, jid, node, id).await; + let _ = sender.send(result); + }, } Ok(()) } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 11d7588..cdaff97 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -1,7 +1,9 @@ use std::str::FromStr; +use base64::{Engine, prelude::BASE64_STANDARD}; use chrono::Utc; use lampada::{Connected, SupervisorSender}; +use sha1::{Digest, Sha1}; use stanza::{ client::{ Stanza, @@ -17,14 +19,23 @@ use uuid::Uuid; use crate::{ UpdateMessage, caps, chat::{Body, Message}, - error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError}, + error::{ + AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError, + PresenceError, RosterError, + }, + files::FileStore, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, + user::User, }; use super::ClientLogic; -pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +pub async fn handle_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, + stanza: Stanza, + connection: Connected, +) { let result = process_stanza(logic.clone(), stanza, connection).await; match result { Ok(u) => match u { @@ -38,10 +49,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne } } -pub async fn recv_message( - logic: ClientLogic, +pub async fn recv_message<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza_message: stanza::client::message::Message, -) -> Result<Option<UpdateMessage>, MessageRecvError> { +) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> { if let Some(from) = stanza_message.from { // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. let timestamp = stanza_message @@ -50,6 +61,7 @@ pub async fn recv_message( .unwrap_or_else(|| Utc::now()); // TODO: group chat messages + // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state. // if there is a body, should create chat message if let Some(body) = stanza_message.body { let message = Message { @@ -67,92 +79,391 @@ pub async fn recv_message( }; // save the message to the database - logic.db().upsert_chat_and_user(&from).await?; - if let Err(e) = logic - .db() - .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) - .await; - } + match logic.db().upsert_chat_and_user(&from).await { + Ok(_) => { + if let Err(e) = logic + .db() + .create_message_with_user_resource( + message.clone(), + from.clone(), + from.clone(), + ) + .await + { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + error!("failed to upsert chat and user") + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) + .await; + error!("failed to upsert chat and user") + } + }; + + let from_user = match logic.db().read_user(from.as_bare()).await { + Ok(u) => u, + Err(e) => { + error!("{}", e); + User { + jid: from.as_bare(), + nick: None, + avatar: None, + } + } + }; // update the client with the new message logic .update_sender() .send(UpdateMessage::Message { to: from.as_bare(), + from: from_user, message, }) .await; } if let Some(nick) = stanza_message.nick { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) - .await; + let nick = nick.0; + if nick.is_empty() { + match logic.db().delete_user_nick(from.as_bare()).await { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick(from.as_bare(), nick.clone()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } } - - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0, - }) - .await; } if let Some(event) = stanza_message.event { match event { - Event::Items(items) => match items.node.as_str() { - "http://jabber.org/protocol/nick" => match items.items { - ItemsType::Item(items) => { - if let Some(item) = items.first() { - match &item.item { - Some(c) => match c { - Content::Nick(nick) => { - if let Err(e) = logic - .db() - .upsert_user_nick(from.as_bare(), nick.0.clone()) - .await - { - logic - .handle_error(Error::MessageRecv( - MessageRecvError::NickUpdate(e), - )) - .await; + Event::Items(items) => { + match items.node.as_str() { + "http://jabber.org/protocol/nick" => match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + match &item.item { + Some(c) => match c { + Content::Nick(nick) => { + let nick = nick.0.clone(); + if nick.is_empty() { + match logic + .db() + .delete_user_nick(from.as_bare()) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: None, + }) + .await; + } + } + } else { + match logic + .db() + .upsert_user_nick( + from.as_bare(), + nick.clone(), + ) + .await + { + Ok(changed) => { + if changed { + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::NickUpdate(e), + )) + .await; + // if failed, send user update anyway + logic + .update_sender() + .send(UpdateMessage::NickChanged { + jid: from.as_bare(), + nick: Some(nick), + }) + .await; + } + } + } } + _ => {} + }, + None => {} + } + } + } + _ => {} + }, + "urn:xmpp:avatar:metadata" => { + match items.items { + ItemsType::Item(items) => { + if let Some(item) = items.first() { + debug!("found item"); + match &item.item { + Some(Content::AvatarMetadata(metadata)) => { + debug!("found metadata"); + // check if user avatar has been deleted + if let Some(metadata) = metadata + .info + .iter() + .find(|info| info.url.is_none()) + { + debug!("checking if user avatar has changed"); + // check if user avatar has changed + match logic + .db() + .upsert_user_avatar( + from.as_bare(), + metadata.id.clone(), + ) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } + } - logic - .update_sender() - .send(UpdateMessage::NickChanged { - jid: from.as_bare(), - nick: nick.0.clone(), - }) - .await; + match logic + .file_store() + .is_stored(&metadata.id) + .await + .map_err(|err| { + AvatarUpdateError::<Fs>::FileStore( + err, + ) + }) { + Ok(false) => { + // get data + let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?; + match pep_item { + crate::pep::Item::AvatarData(data) => { + let data = data.map(|data| data.data_b64).unwrap_or_default().replace("\n", ""); + // TODO: these should all be in a separate avatarupdate function + debug!("got avatar data"); + match BASE64_STANDARD.decode(data) { + Ok(data) => { + let mut hasher = Sha1::new(); + hasher.update(&data); + let received_data_hash = hex::encode(hasher.finalize()); + debug!("received_data_hash: {}, metadata_id: {}", received_data_hash, metadata.id); + if received_data_hash.to_lowercase() == metadata.id.to_lowercase() { + if let Err(e) = logic.file_store().store(&received_data_hash, &data).await { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await; + } + if changed { + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + } + }, + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await; + }, + } + }, + _ => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await; + } + } + } + Ok(true) => { + // just send the update + if changed { + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: Some( + metadata.id.clone(), + ), + }, + ) + .await; + } + } + Err(e) => { + logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await; + } + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } else { + // delete avatar + match logic + .db() + .delete_user_avatar(from.as_bare()) + .await + { + Ok((changed, old_avatar)) => { + if changed { + if let Some(old_avatar) = old_avatar + { + if let Err(e) = logic + .file_store() + .delete(&old_avatar) + .await.map_err(|err| AvatarUpdateError::FileStore(err)) { + logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await; + } + } + logic + .update_sender() + .send( + UpdateMessage::AvatarChanged { + jid: from.as_bare(), + id: None, + }, + ) + .await; + } + } + Err(e) => { + logic + .handle_error(Error::MessageRecv( + MessageRecvError::AvatarUpdate( + AvatarUpdateError::Database( + e, + ), + ), + )) + .await; + } + } + } + // check if the new file is in the file store + // if not, retrieve from server and save in the file store (remember to check if the hash matches) + // send the avatar update + } + _ => {} } - Content::Unknown(element) => {} - }, - None => {} + } } + _ => {} } } - ItemsType::Retract(retracts) => {} - }, - _ => {} - }, + _ => {} + } + } // Event::Collection(collection) => todo!(), // Event::Configuration(configuration) => todo!(), // Event::Delete(delete) => todo!(), // Event::Purge(purge) => todo!(), // Event::Subscription(subscription) => todo!(), - _ => {} + _ => {} // TODO: catch these catch-alls in some way } } @@ -240,15 +551,15 @@ pub async fn recv_presence( } } -pub async fn recv_iq( - logic: ClientLogic, +pub async fn recv_iq<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, connection: Connected, iq: Iq, -) -> Result<Option<UpdateMessage>, IqError> { +) -> Result<Option<UpdateMessage>, IqProcessError> { if let Some(to) = &iq.to { if *to == *connection.jid() { } else { - return Err(IqError::IncorrectAddressee(to.clone())); + return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone()))); } } match iq.r#type { @@ -259,7 +570,11 @@ pub async fn recv_iq( .unwrap_or_else(|| connection.server().clone()); let id = iq.id.clone(); debug!("received iq result with id `{}` from {}", id, from); - logic.pending().respond(Stanza::Iq(iq), id).await?; + logic + .pending() + .respond(Stanza::Iq(iq), id) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } stanza::client::iq::IqType::Get => { @@ -299,7 +614,11 @@ pub async fn recv_iq( errors: vec![StanzaError::ItemNotFound.into()], }; // TODO: log error - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); return Ok(None); } @@ -315,7 +634,11 @@ pub async fn recv_iq( errors: vec![StanzaError::ItemNotFound.into()], }; // TODO: log error - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); return Ok(None); } @@ -330,7 +653,11 @@ pub async fn recv_iq( query: Some(iq::Query::DiscoInfo(disco)), errors: vec![], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to disco#info request from {}", from); Ok(None) } @@ -345,7 +672,11 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::ServiceUnavailable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; warn!("replied to unsupported iq get from {}", from); Ok(None) } // stanza::client::iq::Query::Bind(bind) => todo!(), @@ -365,7 +696,11 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::BadRequest.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; info!("replied to malformed iq query from {}", from); Ok(None) } @@ -416,7 +751,12 @@ pub async fn recv_iq( .handle_error(RosterError::PushReply(e.into()).into()) .await; } - Ok(Some(UpdateMessage::RosterUpdate(contact))) + let user = logic + .db() + .read_user(contact.user_jid.clone()) + .await + .map_err(|e| Into::<RosterError>::into(e))?; + Ok(Some(UpdateMessage::RosterUpdate(contact, user))) } } } else { @@ -430,7 +770,11 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::NotAcceptable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } } @@ -446,7 +790,11 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::ServiceUnavailable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; warn!("replied to unsupported iq set from {}", from); Ok(None) } @@ -462,18 +810,22 @@ pub async fn recv_iq( query: None, errors: vec![StanzaError::NotAcceptable.into()], }; - connection.write_handle().write(Stanza::Iq(iq)).await?; + connection + .write_handle() + .write(Stanza::Iq(iq)) + .await + .map_err(|e| Into::<IqError>::into(e))?; Ok(None) } } } } -pub async fn process_stanza( - logic: ClientLogic, +pub async fn process_stanza<Fs: FileStore + Clone>( + logic: ClientLogic<Fs>, stanza: Stanza, connection: Connected, -) -> Result<Option<UpdateMessage>, Error> { +) -> Result<Option<UpdateMessage>, Error<Fs>> { let update = match stanza { Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), Stanza::Presence(presence) => Ok(recv_presence(presence).await?), diff --git a/filamento/src/pep.rs b/filamento/src/pep.rs index c71d843..3cd243f 100644 --- a/filamento/src/pep.rs +++ b/filamento/src/pep.rs @@ -1,17 +1,8 @@ -// in commandmessage -// pub struct Publish { -// item: Item, -// node: Option<String>, -// // no need for node, as item has the node -// } -// -// in updatemessage -// pub struct Event { -// from: JID, -// item: Item, -// } +use crate::avatar::{Data as AvatarData, Metadata as AvatarMetadata}; #[derive(Clone, Debug)] pub enum Item { - Nick(String), + Nick(Option<String>), + AvatarMetadata(Option<AvatarMetadata>), + AvatarData(Option<AvatarData>), } diff --git a/filamento/src/user.rs b/filamento/src/user.rs index 85471d5..8669fc3 100644 --- a/filamento/src/user.rs +++ b/filamento/src/user.rs @@ -4,5 +4,6 @@ use jid::JID; pub struct User { pub jid: JID, pub nick: Option<String>, - pub cached_status_message: Option<String>, + pub avatar: Option<String>, + // pub cached_status_message: Option<String>, } |