aboutsummaryrefslogtreecommitdiffstats
path: root/filamento
diff options
context:
space:
mode:
Diffstat (limited to 'filamento')
-rw-r--r--filamento/.gitignore1
-rw-r--r--filamento/Cargo.toml6
-rw-r--r--filamento/examples/example.rs69
-rw-r--r--filamento/migrations/20240113011930_luz.sql1
-rw-r--r--filamento/src/avatar.rs34
-rw-r--r--filamento/src/caps.rs7
-rw-r--r--filamento/src/db.rs406
-rw-r--r--filamento/src/error.rs94
-rw-r--r--filamento/src/files.rs77
-rw-r--r--filamento/src/lib.rs233
-rw-r--r--filamento/src/logic/abort.rs4
-rw-r--r--filamento/src/logic/connect.rs8
-rw-r--r--filamento/src/logic/connection_error.rs7
-rw-r--r--filamento/src/logic/disconnect.rs7
-rw-r--r--filamento/src/logic/local_only.rs53
-rw-r--r--filamento/src/logic/mod.rs27
-rw-r--r--filamento/src/logic/offline.rs82
-rw-r--r--filamento/src/logic/online.rs462
-rw-r--r--filamento/src/logic/process_stanza.rs512
-rw-r--r--filamento/src/pep.rs17
-rw-r--r--filamento/src/user.rs3
21 files changed, 1822 insertions, 288 deletions
diff --git a/filamento/.gitignore b/filamento/.gitignore
index ec8a40b..1ba9f2a 100644
--- a/filamento/.gitignore
+++ b/filamento/.gitignore
@@ -1,2 +1,3 @@
filamento.db
+files/
.sqlx/
diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml
index 1c28c39..91b7e91 100644
--- a/filamento/Cargo.toml
+++ b/filamento/Cargo.toml
@@ -8,7 +8,7 @@ futures = "0.3.31"
lampada = { version = "0.1.0", path = "../lampada" }
tokio = "1.42.0"
thiserror = "2.0.11"
-stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115"] }
+stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115", "xep_0084"] }
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
# TODO: re-export jid?
jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
@@ -19,10 +19,12 @@ sha2 = "0.10.8"
sha3 = "0.10.8"
base64 = "0.22.1"
sha1 = "0.10.6"
+image = "0.25.6"
+hex = "0.4.3"
[dev-dependencies]
tracing-subscriber = "0.3.19"
-peanuts = { version = "0.1.0", path = "../../peanuts" }
+peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" }
[[example]]
name = "example"
diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs
index 74a9aa1..8119743 100644
--- a/filamento/examples/example.rs
+++ b/filamento/examples/example.rs
@@ -1,17 +1,56 @@
-use std::{path::Path, str::FromStr, time::Duration};
+use std::{path::Path, str::FromStr, sync::Arc, time::Duration};
-use filamento::{Client, db::Db};
+use filamento::{Client, db::Db, files::FileStore};
use jid::JID;
+use tokio::io::{self, AsyncReadExt};
use tracing::info;
+#[derive(Clone, Debug)]
+pub struct Files;
+
+impl FileStore for Files {
+ type Err = Arc<io::Error>;
+
+ async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> {
+ tracing::debug!("checking if {} is stored", name);
+ let res = tokio::fs::try_exists(format!("files/{}", name))
+ .await
+ .map_err(|err| Arc::new(err));
+ tracing::debug!("file check res: {:?}", res);
+ res
+ }
+
+ async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> {
+ tracing::debug!("storing {} is stored", name);
+ let res = tokio::fs::write(format!("files/{}", name), data)
+ .await
+ .map_err(|err| Arc::new(err));
+ tracing::debug!("file store res: {:?}", res);
+ res
+ }
+
+ async fn delete(&self, name: &str) -> Result<(), Self::Err> {
+ tracing::debug!("deleting {}", name);
+ let res = tokio::fs::remove_file(format!("files/{}", name))
+ .await
+ .map_err(|err| Arc::new(err));
+ tracing::debug!("file delete res: {:?}", res);
+ res
+ }
+}
+
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let db = Db::create_connect_and_migrate(Path::new("./filamento.db"))
.await
.unwrap();
- let (client, mut recv) =
- Client::new("test@blos.sm".try_into().unwrap(), "slayed".to_string(), db);
+ let (client, mut recv) = Client::new(
+ "test@blos.sm/testing2".try_into().unwrap(),
+ "slayed".to_string(),
+ db,
+ Files,
+ );
tokio::spawn(async move {
while let Some(msg) = recv.recv().await {
@@ -22,7 +61,16 @@ async fn main() {
client.connect().await.unwrap();
tokio::time::sleep(Duration::from_secs(5)).await;
info!("changing nick");
- client.change_nick("britney".to_string()).await.unwrap();
+ client
+ .change_nick(Some("britney".to_string()))
+ .await
+ .unwrap();
+ let mut profile_pic = tokio::fs::File::open("files/britney_starbies.jpg")
+ .await
+ .unwrap();
+ let mut data = Vec::new();
+ profile_pic.read_to_end(&mut data).await.unwrap();
+ client.change_avatar(Some(data)).await.unwrap();
info!("sending message");
client
.send_message(
@@ -34,7 +82,16 @@ async fn main() {
.await
.unwrap();
info!("sent message");
- tokio::time::sleep(Duration::from_secs(5)).await;
+ client
+ .send_message(
+ JID::from_str("cel@blos.sm").unwrap(),
+ filamento::chat::Body {
+ body: "hallo 2".to_string(),
+ },
+ )
+ .await
+ .unwrap();
+ tokio::time::sleep(Duration::from_secs(15)).await;
// info!("sending disco query");
// let info = client.disco_info(None, None).await.unwrap();
// info!("got disco result: {:#?}", info);
diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql
index 8c1b01c..c2f35dd 100644
--- a/filamento/migrations/20240113011930_luz.sql
+++ b/filamento/migrations/20240113011930_luz.sql
@@ -6,6 +6,7 @@ create table users(
-- TODO: enforce bare jid
jid text primary key not null,
nick text,
+ avatar text,
-- can receive presence status from non-contacts
cached_status_message text
-- TODO: last_seen
diff --git a/filamento/src/avatar.rs b/filamento/src/avatar.rs
new file mode 100644
index 0000000..a6937df
--- /dev/null
+++ b/filamento/src/avatar.rs
@@ -0,0 +1,34 @@
+#[derive(Clone, Debug)]
+pub struct Metadata {
+ pub bytes: u32,
+ pub hash: String,
+ pub r#type: String,
+}
+
+#[derive(Clone, Debug)]
+pub struct Data {
+ pub hash: String,
+ pub data_b64: String,
+}
+
+#[derive(Clone, Debug)]
+pub struct Avatar(Vec<u8>);
+
+impl From<stanza::xep_0084::Info> for Metadata {
+ fn from(value: stanza::xep_0084::Info) -> Self {
+ Self {
+ bytes: value.bytes,
+ hash: value.id,
+ r#type: value.r#type,
+ }
+ }
+}
+
+impl From<stanza::xep_0084::Data> for Data {
+ fn from(value: stanza::xep_0084::Data) -> Self {
+ Self {
+ hash: todo!(),
+ data_b64: todo!(),
+ }
+ }
+}
diff --git a/filamento/src/caps.rs b/filamento/src/caps.rs
index 49d05ba..819e669 100644
--- a/filamento/src/caps.rs
+++ b/filamento/src/caps.rs
@@ -35,12 +35,13 @@ pub fn client_info() -> Info {
Info {
node: None,
features: vec![
- "http://jabber.org/protocol/disco#items".to_string(),
- "http://jabber.org/protocol/disco#info".to_string(),
"http://jabber.org/protocol/caps".to_string(),
- "http://jabber.org/protocol/nick".to_string(),
+ "http://jabber.org/protocol/disco#info".to_string(),
+ "http://jabber.org/protocol/disco#items".to_string(),
"http://jabber.org/protocol/nick+notify".to_string(),
+ "urn:xmpp:avatar:metadata+notify".to_string(),
],
+ // "http://jabber.org/protocol/nick".to_string(),
identities: vec![Identity {
name: Some("filamento 0.1.0".to_string()),
category: Category::Client(identity::Client::PC),
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
index c19f16c..d9206cc 100644
--- a/filamento/src/db.rs
+++ b/filamento/src/db.rs
@@ -1,12 +1,12 @@
use std::{collections::HashSet, path::Path};
-use chrono::Utc;
+use chrono::{DateTime, Utc};
use jid::JID;
use sqlx::{SqlitePool, migrate};
use uuid::Uuid;
use crate::{
- chat::{Chat, Message},
+ chat::{Body, Chat, Delivery, Message},
error::{DatabaseError as Error, DatabaseOpenError},
presence::Online,
roster::Contact,
@@ -51,10 +51,9 @@ impl Db {
pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
- "insert into users ( jid, nick, cached_status_message ) values ( ?, ?, ? )",
+ "insert into users ( jid, nick ) values ( ?, ? )",
user.jid,
user.nick,
- user.cached_status_message
)
.execute(&self.db)
.await?;
@@ -75,22 +74,116 @@ impl Db {
Ok(user)
}
- pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<(), Error> {
- sqlx::query!(
- "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?",
+ /// returns whether or not the nickname was updated
+ pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> {
+ if sqlx::query!(
+ "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?",
+ jid,
+ None::<String>,
+ None::<String>,
+ None::<String>,
+ )
+ .execute(&self.db)
+ .await?
+ .rows_affected()
+ > 0
+ {
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
+ /// returns whether or not the nickname was updated
+ pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> {
+ let rows_affected = sqlx::query!(
+ "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?",
jid,
nick,
+ nick,
nick
)
.execute(&self.db)
- .await?;
- Ok(())
+ .await?
+ .rows_affected();
+ tracing::debug!("rows affected: {}", rows_affected);
+ if rows_affected > 0 {
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
+ /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
+ pub(crate) async fn delete_user_avatar(
+ &self,
+ jid: JID,
+ ) -> Result<(bool, Option<String>), Error> {
+ #[derive(sqlx::FromRow)]
+ struct AvatarRow {
+ avatar: Option<String>,
+ }
+ let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?")
+ .bind(jid.clone())
+ .fetch_optional(&self.db)
+ .await?
+ .map(|row: AvatarRow| row.avatar)
+ .unwrap_or(None);
+ if sqlx::query!(
+ "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?",
+ jid,
+ None::<String>,
+ None::<String>,
+ None::<String>,
+ )
+ .execute(&self.db)
+ .await?
+ .rows_affected()
+ > 0
+ {
+ Ok((true, old_avatar))
+ } else {
+ Ok((false, old_avatar))
+ }
+ }
+
+ /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
+ pub(crate) async fn upsert_user_avatar(
+ &self,
+ jid: JID,
+ avatar: String,
+ ) -> Result<(bool, Option<String>), Error> {
+ #[derive(sqlx::FromRow)]
+ struct AvatarRow {
+ avatar: Option<String>,
+ }
+ let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?")
+ .bind(jid.clone())
+ .fetch_optional(&self.db)
+ .await?
+ .map(|row: AvatarRow| row.avatar)
+ .unwrap_or(None);
+ if sqlx::query!(
+ "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?",
+ jid,
+ avatar,
+ avatar,
+ avatar,
+ )
+ .execute(&self.db)
+ .await?
+ .rows_affected()
+ > 0
+ {
+ Ok((true, old_avatar))
+ } else {
+ Ok((false, old_avatar))
+ }
}
pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
- "update users set cached_status_message = ?, nick = ? where jid = ?",
- user.cached_status_message,
+ "update users set nick = ? where jid = ?",
user.nick,
user.jid
)
@@ -266,10 +359,9 @@ impl Db {
}
pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
- let mut roster: Vec<Contact> =
- sqlx::query_as("select * from roster join users on jid = user_jid")
- .fetch_all(&self.db)
- .await?;
+ let mut roster: Vec<Contact> = sqlx::query_as("select * from roster")
+ .fetch_all(&self.db)
+ .await?;
for contact in &mut roster {
#[derive(sqlx::FromRow)]
struct Row {
@@ -285,13 +377,47 @@ impl Db {
Ok(roster)
}
+ pub(crate) async fn read_cached_roster_with_users(
+ &self,
+ ) -> Result<Vec<(Contact, User)>, Error> {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ #[sqlx(flatten)]
+ contact: Contact,
+ #[sqlx(flatten)]
+ user: User,
+ }
+ let mut roster: Vec<Row> =
+ sqlx::query_as("select * from roster join users on jid = user_jid")
+ .fetch_all(&self.db)
+ .await?;
+ for row in &mut roster {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&row.contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ row.contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ }
+ let roster = roster
+ .into_iter()
+ .map(|row| (row.contact, row.user))
+ .collect();
+ Ok(roster)
+ }
+
pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
let id = Uuid::new_v4();
let jid = chat.correspondent();
sqlx::query!(
- "insert into chats (id, correspondent) values (?, ?)",
+ "insert into chats (id, correspondent, have_chatted) values (?, ?, ?)",
id,
- jid
+ jid,
+ chat.have_chatted,
)
.execute(&self.db)
.await?;
@@ -371,21 +497,197 @@ impl Db {
&self,
) -> Result<Vec<(Chat, Message)>, Error> {
#[derive(sqlx::FromRow)]
+ pub struct RowChat {
+ chat_correspondent: JID,
+ chat_have_chatted: bool,
+ }
+ impl From<RowChat> for Chat {
+ fn from(value: RowChat) -> Self {
+ Self {
+ correspondent: value.chat_correspondent,
+ have_chatted: value.chat_have_chatted,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessage {
+ message_id: Uuid,
+ message_body: String,
+ message_delivery: Option<Delivery>,
+ message_timestamp: DateTime<Utc>,
+ message_from_jid: JID,
+ }
+ impl From<RowMessage> for Message {
+ fn from(value: RowMessage) -> Self {
+ Self {
+ id: value.message_id,
+ from: value.message_from_jid,
+ delivery: value.message_delivery,
+ timestamp: value.message_timestamp,
+ body: Body {
+ body: value.message_body,
+ },
+ }
+ }
+ }
+
+ #[derive(sqlx::FromRow)]
+ pub struct ChatWithMessageRow {
+ #[sqlx(flatten)]
+ pub chat: RowChat,
+ #[sqlx(flatten)]
+ pub message: RowMessage,
+ }
+
pub struct ChatWithMessage {
+ chat: Chat,
+ message: Message,
+ }
+
+ impl From<ChatWithMessageRow> for ChatWithMessage {
+ fn from(value: ChatWithMessageRow) -> Self {
+ Self {
+ chat: value.chat.into(),
+ message: value.message.into(),
+ }
+ }
+ }
+
+ // TODO: sometimes chats have no messages.
+ let chats: Vec<ChatWithMessageRow> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ .fetch_all(&self.db)
+ .await?;
+
+ let chats = chats
+ .into_iter()
+ .map(|chat_with_message_row| {
+ let chat_with_message: ChatWithMessage = chat_with_message_row.into();
+ (chat_with_message.chat, chat_with_message.message)
+ })
+ .collect();
+
+ Ok(chats)
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(
+ &self,
+ ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
+ #[derive(sqlx::FromRow)]
+ pub struct RowChat {
+ chat_correspondent: JID,
+ chat_have_chatted: bool,
+ }
+ impl From<RowChat> for Chat {
+ fn from(value: RowChat) -> Self {
+ Self {
+ correspondent: value.chat_correspondent,
+ have_chatted: value.chat_have_chatted,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessage {
+ message_id: Uuid,
+ message_body: String,
+ message_delivery: Option<Delivery>,
+ message_timestamp: DateTime<Utc>,
+ message_from_jid: JID,
+ }
+ impl From<RowMessage> for Message {
+ fn from(value: RowMessage) -> Self {
+ Self {
+ id: value.message_id,
+ from: value.message_from_jid,
+ delivery: value.message_delivery,
+ timestamp: value.message_timestamp,
+ body: Body {
+ body: value.message_body,
+ },
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowChatUser {
+ chat_user_jid: JID,
+ chat_user_nick: Option<String>,
+ chat_user_avatar: Option<String>,
+ }
+ impl From<RowChatUser> for User {
+ fn from(value: RowChatUser) -> Self {
+ Self {
+ jid: value.chat_user_jid,
+ nick: value.chat_user_nick,
+ avatar: value.chat_user_avatar,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessageUser {
+ message_user_jid: JID,
+ message_user_nick: Option<String>,
+ message_user_avatar: Option<String>,
+ }
+ impl From<RowMessageUser> for User {
+ fn from(value: RowMessageUser) -> Self {
+ Self {
+ jid: value.message_user_jid,
+ nick: value.message_user_nick,
+ avatar: value.message_user_avatar,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct ChatWithMessageAndUsersRow {
+ #[sqlx(flatten)]
+ pub chat: RowChat,
#[sqlx(flatten)]
- pub chat: Chat,
+ pub chat_user: RowChatUser,
#[sqlx(flatten)]
- pub message: Message,
+ pub message: RowMessage,
+ #[sqlx(flatten)]
+ pub message_user: RowMessageUser,
+ }
+
+ impl From<ChatWithMessageAndUsersRow> for ChatWithMessageAndUsers {
+ fn from(value: ChatWithMessageAndUsersRow) -> Self {
+ Self {
+ chat: value.chat.into(),
+ chat_user: value.chat_user.into(),
+ message: value.message.into(),
+ message_user: value.message_user.into(),
+ }
+ }
}
- // TODO: i don't know if this will assign the right uuid to the latest message or the chat's id. should probably check but i don't think it matters as nothing ever gets called with the id of the latest message in the chats list
- let chats: Vec<ChatWithMessage> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ pub struct ChatWithMessageAndUsers {
+ chat: Chat,
+ chat_user: User,
+ message: Message,
+ message_user: User,
+ }
+
+ let chats: Vec<ChatWithMessageAndUsersRow> = sqlx::query_as("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc")
.fetch_all(&self.db)
.await?;
let chats = chats
.into_iter()
- .map(|chat_with_message| (chat_with_message.chat, chat_with_message.message))
+ .map(|chat_with_message_and_users_row| {
+ let chat_with_message_and_users: ChatWithMessageAndUsers =
+ chat_with_message_and_users_row.into();
+ (
+ (
+ chat_with_message_and_users.chat,
+ chat_with_message_and_users.chat_user,
+ ),
+ (
+ chat_with_message_and_users.message,
+ chat_with_message_and_users.message_user,
+ ),
+ )
+ })
.collect();
Ok(chats)
@@ -441,12 +743,14 @@ impl Db {
.execute(&self.db)
.await?;
let id = Uuid::new_v4();
- let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing returning *")
+ let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?")
.bind(id)
- .bind(bare_chat)
+ .bind(bare_chat.clone())
.bind(false)
+ .bind(bare_chat)
.fetch_one(&self.db)
.await?;
+ tracing::debug!("CHECKING chat: {:?}", chat);
Ok(chat.have_chatted)
}
@@ -472,7 +776,7 @@ impl Db {
Ok(())
}
- // create direct message from incoming
+ /// create direct message from incoming. MUST upsert chat and user
pub(crate) async fn create_message_with_user_resource(
&self,
message: Message,
@@ -482,20 +786,20 @@ impl Db {
) -> Result<(), Error> {
let bare_chat = chat.as_bare();
let resource = &chat.resourcepart;
- sqlx::query!(
- "insert into users (jid) values (?) on conflict do nothing",
- bare_chat
- )
- .execute(&self.db)
- .await?;
- let id = Uuid::new_v4();
- sqlx::query!(
- "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
- id,
- bare_chat
- )
- .execute(&self.db)
- .await?;
+ // sqlx::query!(
+ // "insert into users (jid) values (?) on conflict do nothing",
+ // bare_chat
+ // )
+ // .execute(&self.db)
+ // .await?;
+ // let id = Uuid::new_v4();
+ // sqlx::query!(
+ // "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
+ // id,
+ // bare_chat
+ // )
+ // .execute(&self.db)
+ // .await?;
if let Some(resource) = resource {
sqlx::query!(
"insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
@@ -537,6 +841,30 @@ impl Db {
Ok(messages)
}
+ pub(crate) async fn read_message_history_with_users(
+ &self,
+ chat: JID,
+ ) -> Result<Vec<(Message, User)>, Error> {
+ let chat_id = self.read_chat_id(chat).await?;
+ #[derive(sqlx::FromRow)]
+ pub struct Row {
+ #[sqlx(flatten)]
+ user: User,
+ #[sqlx(flatten)]
+ message: Message,
+ }
+ let messages: Vec<Row> =
+ sqlx::query_as("select * from messages join users on jid = from_jid where chat_id = ? order by timestamp asc")
+ .bind(chat_id)
+ .fetch_all(&self.db)
+ .await?;
+ let messages = messages
+ .into_iter()
+ .map(|row| (row.message, row.user))
+ .collect();
+ Ok(messages)
+ }
+
pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
let online: Online = sqlx::query_as("select * from cached_status where id = 0")
.fetch_one(&self.db)
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
index 5111413..f2bf6ef 100644
--- a/filamento/src/error.rs
+++ b/filamento/src/error.rs
@@ -1,15 +1,19 @@
-use std::{string::FromUtf8Error, sync::Arc};
+use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc};
+use base64::DecodeError;
+use image::ImageError;
use jid::JID;
-use lampada::error::{ConnectionError, ReadError, WriteError};
+use lampada::error::{ActorError, ConnectionError, ReadError, WriteError};
use stanza::client::{Stanza, iq::Query};
use thiserror::Error;
pub use lampada::error::CommandError;
+use crate::files::FileStore;
+
// for the client logic impl
#[derive(Debug, Error, Clone)]
-pub enum Error {
+pub enum Error<Fs: FileStore> {
#[error("core error: {0}")]
Connection(#[from] ConnectionError),
#[error("received unrecognized/unsupported content")]
@@ -17,7 +21,7 @@ pub enum Error {
// TODO: include content
// UnrecognizedContent(peanuts::element::Content),
#[error("iq receive error: {0}")]
- Iq(#[from] IqError),
+ Iq(#[from] IqProcessError),
// TODO: change to Connecting(ConnectingError)
#[error("connecting: {0}")]
Connecting(#[from] ConnectionJobError),
@@ -33,11 +37,11 @@ pub enum Error {
#[error("message send error: {0}")]
MessageSend(#[from] MessageSendError),
#[error("message receive error: {0}")]
- MessageRecv(#[from] MessageRecvError),
+ MessageRecv(#[from] MessageRecvError<Fs>),
#[error("subscripbe error: {0}")]
Subscribe(#[from] SubscribeError),
#[error("publish error: {0}")]
- Publish(#[from] PublishError),
+ Publish(#[from] PEPError),
}
#[derive(Debug, Error, Clone)]
@@ -53,13 +57,29 @@ pub enum MessageSendError {
}
#[derive(Debug, Error, Clone)]
-pub enum MessageRecvError {
+pub enum MessageRecvError<Fs: FileStore> {
#[error("could not add to message history: {0}")]
MessageHistory(#[from] DatabaseError),
#[error("missing from")]
MissingFrom,
#[error("could not update user nick: {0}")]
NickUpdate(DatabaseError),
+ #[error("could not update user avatar: {0}")]
+ AvatarUpdate(#[from] AvatarUpdateError<Fs>),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum AvatarUpdateError<Fs: FileStore> {
+ #[error("could not save to disk: {0}")]
+ FileStore(Fs::Err),
+ #[error("could not fetch avatar data: {0}")]
+ PEPError(#[from] CommandError<PEPError>),
+ #[error("base64 decode: {0}")]
+ Base64(#[from] DecodeError),
+ #[error("pep node missing avatar data")]
+ MissingData,
+ #[error("database: {0}")]
+ Database(#[from] DatabaseError),
}
#[derive(Debug, Error, Clone)]
@@ -97,6 +117,17 @@ pub enum RosterError {
StanzaError(#[from] stanza::client::error::Error),
#[error("could not reply to roster push: {0}")]
PushReply(WriteError),
+ #[error("actor error: {0}")]
+ Actor(ActorError),
+}
+
+impl From<CommandError<RosterError>> for RosterError {
+ fn from(value: CommandError<RosterError>) -> Self {
+ match value {
+ CommandError::Actor(actor_error) => Self::Actor(actor_error),
+ CommandError::Error(e) => e,
+ }
+ }
}
#[derive(Debug, Error, Clone)]
@@ -161,6 +192,14 @@ pub enum IqError {
}
#[derive(Debug, Error, Clone)]
+pub enum IqProcessError {
+ #[error("iq error")]
+ Iq(#[from] IqError),
+ #[error("roster push")]
+ Roster(#[from] RosterError),
+}
+
+#[derive(Debug, Error, Clone)]
pub enum DatabaseOpenError {
#[error("error: {0}")]
Error(Arc<sqlx::Error>),
@@ -203,7 +242,7 @@ pub enum PresenceError {
}
#[derive(Debug, Error, Clone)]
-pub enum PublishError {
+pub enum PEPError {
#[error("received mismatched query")]
MismatchedQuery(Query),
#[error("missing query")]
@@ -216,12 +255,19 @@ pub enum PublishError {
UnexpectedStanza(Stanza),
#[error("iq response: {0}")]
IqResponse(#[from] IqRequestError),
+ #[error("missing pep item")]
+ MissingItem,
+ #[error("incorrect item id: expected {0}, got {1}")]
+ IncorrectItemID(String, String),
+ #[error("unsupported pep item")]
+ UnsupportedItem,
+ // TODO: should the item be in the error?
}
#[derive(Debug, Error, Clone)]
pub enum NickError {
#[error("publishing nick: {0}")]
- Publish(#[from] CommandError<PublishError>),
+ Publish(#[from] CommandError<PEPError>),
#[error("updating database: {0}")]
Database(#[from] DatabaseError),
#[error("disconnected")]
@@ -267,5 +313,31 @@ pub enum CapsNodeConversionError {
#[error("missing hashtag")]
MissingHashtag,
}
-// #[derive(Debug, Error, Clone)]
-// pub enum CapsError {}
+
+#[derive(Debug, Error, Clone)]
+pub enum AvatarPublishError<Fs: FileStore> {
+ #[error("disconnected")]
+ Disconnected,
+ #[error("image read: {0}")]
+ Read(Arc<std::io::Error>),
+ #[error("image: {0}")]
+ Image(Arc<ImageError>),
+ #[error("pep publish: {0}")]
+ Publish(#[from] CommandError<PEPError>),
+ #[error("bytes number conversion: {0}")]
+ FromInt(#[from] TryFromIntError),
+ #[error("could not save to disk")]
+ FileStore(Fs::Err),
+}
+
+impl<Fs: FileStore> From<std::io::Error> for AvatarPublishError<Fs> {
+ fn from(value: std::io::Error) -> Self {
+ Self::Read(Arc::new(value))
+ }
+}
+
+impl<Fs: FileStore> From<ImageError> for AvatarPublishError<Fs> {
+ fn from(value: ImageError) -> Self {
+ Self::Image(Arc::new(value))
+ }
+}
diff --git a/filamento/src/files.rs b/filamento/src/files.rs
new file mode 100644
index 0000000..3acc871
--- /dev/null
+++ b/filamento/src/files.rs
@@ -0,0 +1,77 @@
+use std::{
+ error::Error,
+ path::{Path, PathBuf},
+ sync::Arc,
+};
+
+use tokio::io;
+
+pub trait FileStore {
+ type Err: Clone + Send + Error;
+
+ fn is_stored(
+ &self,
+ name: &str,
+ ) -> impl std::future::Future<Output = Result<bool, Self::Err>> + std::marker::Send;
+ fn store(
+ &self,
+ name: &str,
+ data: &[u8],
+ ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send;
+ fn delete(
+ &self,
+ name: &str,
+ ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send;
+}
+
+#[derive(Clone, Debug)]
+pub struct Files {
+ root: PathBuf,
+}
+
+impl Files {
+ pub fn new(root: impl AsRef<Path>) -> Self {
+ let root = root.as_ref();
+ let root = root.into();
+ Self { root }
+ }
+
+ pub fn root(&self) -> &Path {
+ &self.root
+ }
+}
+
+impl FileStore for Files {
+ type Err = Arc<io::Error>;
+
+ async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> {
+ tracing::debug!("checking if {} is stored", name);
+ // TODO: is this secure ;-;
+ let name = name.replace("/", "").replace(".", "");
+ let res = tokio::fs::try_exists(self.root.join(name))
+ .await
+ .map_err(|err| Arc::new(err));
+ tracing::debug!("file check res: {:?}", res);
+ res
+ }
+
+ async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> {
+ tracing::debug!("storing {} is stored", name);
+ let name = name.replace("/", "").replace(".", "");
+ let res = tokio::fs::write(self.root.join(name), data)
+ .await
+ .map_err(|err| Arc::new(err));
+ tracing::debug!("file store res: {:?}", res);
+ res
+ }
+
+ async fn delete(&self, name: &str) -> Result<(), Self::Err> {
+ tracing::debug!("deleting {}", name);
+ let name = name.replace("/", "").replace(".", "");
+ let res = tokio::fs::remove_file(self.root.join(name))
+ .await
+ .map_err(|err| Arc::new(err));
+ tracing::debug!("file delete res: {:?}", res);
+ res
+ }
+}
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
index c44edca..14b0cae 100644
--- a/filamento/src/lib.rs
+++ b/filamento/src/lib.rs
@@ -11,9 +11,10 @@ use chrono::Utc;
use db::Db;
use disco::{Info, Items};
use error::{
- ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, NickError,
- PresenceError, PublishError, RosterError, StatusError, SubscribeError,
+ AvatarPublishError, ConnectionJobError, DatabaseError, DiscoError, Error, IqError,
+ MessageRecvError, NickError, PEPError, PresenceError, RosterError, StatusError, SubscribeError,
};
+use files::FileStore;
use futures::FutureExt;
use jid::JID;
use lampada::{
@@ -35,20 +36,24 @@ use tracing::{debug, info};
use user::User;
use uuid::Uuid;
+pub mod avatar;
pub mod caps;
pub mod chat;
pub mod db;
pub mod disco;
pub mod error;
+pub mod files;
mod logic;
pub mod pep;
pub mod presence;
pub mod roster;
pub mod user;
-pub enum Command {
+pub enum Command<Fs: FileStore> {
/// get the roster. if offline, retreive cached version from database. should be stored in application memory
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
+ /// get the roster. if offline, retreive cached version from database. should be stored in application memory. includes user associated with each contact
+ GetRosterWithUsers(oneshot::Sender<Result<Vec<(Contact, User)>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
@@ -56,11 +61,21 @@ pub enum Command {
GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
// TODO: paging and filtering
GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
+ // TODO: paging and filtering, nullabillity for latest message
+ GetChatsOrderedWithLatestMessagesAndUsers(
+ oneshot::Sender<Result<Vec<((Chat, User), (Message, User))>, DatabaseError>>,
+ ),
/// get a specific chat by jid
GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
+ /// get message history for chat (does appropriate mam things)
+ // TODO: paging and filtering
+ GetMessagesWithUsers(
+ JID,
+ oneshot::Sender<Result<Vec<(Message, User)>, DatabaseError>>,
+ ),
/// delete a chat from your chat history, along with all the corresponding messages
DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
/// delete a message from your chat history
@@ -115,27 +130,42 @@ pub enum Command {
Option<String>,
oneshot::Sender<Result<disco::Items, DiscoError>>,
),
- /// publish item to a pep node, specified or default according to item.
- Publish {
+ /// publish item to a pep node specified.
+ PublishPEPItem {
item: pep::Item,
node: String,
- sender: oneshot::Sender<Result<(), PublishError>>,
+ sender: oneshot::Sender<Result<(), PEPError>>,
},
- /// change user nickname
- ChangeNick(String, oneshot::Sender<Result<(), NickError>>),
+ DeletePEPNode {
+ node: String,
+ sender: oneshot::Sender<Result<(), PEPError>>,
+ },
+ GetPEPItem {
+ jid: Option<JID>,
+ node: String,
+ id: String,
+ sender: oneshot::Sender<Result<pep::Item, PEPError>>,
+ },
+ /// change client user nickname
+ ChangeNick(Option<String>, oneshot::Sender<Result<(), NickError>>),
+ // // TODO
+ // GetNick(...),
+ // GetAvatar(...)
// /// get capability node
// GetCaps(String, oneshot::Sender<Result<Info, CapsError>>),
+ /// change client user avatar
+ ChangeAvatar(
+ Option<Vec<u8>>,
+ oneshot::Sender<Result<(), AvatarPublishError<Fs>>>,
+ ),
}
#[derive(Debug, Clone)]
pub enum UpdateMessage {
- Online(Online, Vec<Contact>),
+ Online(Online, Vec<(Contact, User)>),
Offline(Offline),
- /// received roster from jabber server (replace full app roster state with this)
- /// is this needed?
- FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace)
- RosterUpdate(Contact),
+ RosterUpdate(Contact, User),
RosterDelete(JID),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence {
@@ -146,27 +176,42 @@ pub enum UpdateMessage {
// MessageDispatched(Uuid),
Message {
to: JID,
+ from: User,
message: Message,
},
MessageDelivery {
id: Uuid,
+ chat: JID,
delivery: Delivery,
},
SubscriptionRequest(jid::JID),
NickChanged {
jid: JID,
- nick: String,
+ nick: Option<String>,
+ },
+ AvatarChanged {
+ jid: JID,
+ id: Option<String>,
},
}
/// an xmpp client that is suited for a chat client use case
#[derive(Debug)]
-pub struct Client {
- sender: mpsc::Sender<CoreClientCommand<Command>>,
+pub struct Client<Fs: FileStore> {
+ sender: mpsc::Sender<CoreClientCommand<Command<Fs>>>,
timeout: Duration,
}
-impl Clone for Client {
+impl<Fs: FileStore> Client<Fs> {
+ pub fn with_timeout(&self, timeout: Duration) -> Self {
+ Self {
+ sender: self.sender.clone(),
+ timeout,
+ }
+ }
+}
+
+impl<Fs: FileStore> Clone for Client<Fs> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
@@ -175,32 +220,27 @@ impl Clone for Client {
}
}
-impl Deref for Client {
- type Target = mpsc::Sender<CoreClientCommand<Command>>;
+impl<Fs: FileStore> Deref for Client<Fs> {
+ type Target = mpsc::Sender<CoreClientCommand<Command<Fs>>>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
-impl DerefMut for Client {
+impl<Fs: FileStore> DerefMut for Client<Fs> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
-impl Client {
- pub async fn connect(&self) -> Result<(), ActorError> {
- self.send(CoreClientCommand::Connect).await?;
- Ok(())
- }
-
- pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
- self.send(CoreClientCommand::Disconnect).await?;
- Ok(())
- }
-
- pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
+impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> {
+ pub fn new(
+ jid: JID,
+ password: String,
+ db: Db,
+ file_store: Fs,
+ ) -> (Self, mpsc::Receiver<UpdateMessage>) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_send, update_recv) = mpsc::channel(20);
@@ -214,14 +254,26 @@ impl Client {
timeout: Duration::from_secs(10),
};
- let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send);
+ let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send, file_store);
- let actor: CoreClient<ClientLogic> =
+ let actor: CoreClient<ClientLogic<Fs>> =
CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
tokio::spawn(async move { actor.run().await });
(client, update_recv)
}
+}
+
+impl<Fs: FileStore> Client<Fs> {
+ pub async fn connect(&self) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Connect).await?;
+ Ok(())
+ }
+
+ pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Disconnect).await?;
+ Ok(())
+ }
pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
@@ -235,6 +287,22 @@ impl Client {
Ok(roster)
}
+ pub async fn get_roster_with_users(
+ &self,
+ ) -> Result<Vec<(Contact, User)>, CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetRosterWithUsers(
+ send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let roster = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(roster)
+ }
+
pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChats(send)))
@@ -275,6 +343,22 @@ impl Client {
Ok(chats)
}
+ pub async fn get_chats_ordered_with_latest_messages_and_users(
+ &self,
+ ) -> Result<Vec<((Chat, User), (Message, User))>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChat(jid, send)))
@@ -302,6 +386,23 @@ impl Client {
Ok(messages)
}
+ pub async fn get_messages_with_users(
+ &self,
+ jid: JID,
+ ) -> Result<Vec<(Message, User)>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetMessagesWithUsers(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let messages = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(messages)
+ }
+
pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send)))
@@ -539,9 +640,9 @@ impl Client {
&self,
item: pep::Item,
node: String,
- ) -> Result<(), CommandError<PublishError>> {
+ ) -> Result<(), CommandError<PEPError>> {
let (send, recv) = oneshot::channel();
- self.send(CoreClientCommand::Command(Command::Publish {
+ self.send(CoreClientCommand::Command(Command::PublishPEPItem {
item,
node,
sender: send,
@@ -555,7 +656,44 @@ impl Client {
Ok(result)
}
- pub async fn change_nick(&self, nick: String) -> Result<(), CommandError<NickError>> {
+ pub async fn delete_pep_node(&self, node: String) -> Result<(), CommandError<PEPError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeletePEPNode {
+ node,
+ sender: send,
+ }))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn get_pep_item(
+ &self,
+ jid: Option<JID>,
+ node: String,
+ id: String,
+ ) -> Result<pep::Item, CommandError<PEPError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetPEPItem {
+ jid,
+ node,
+ id,
+ sender: send,
+ }))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn change_nick(&self, nick: Option<String>) -> Result<(), CommandError<NickError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::ChangeNick(nick, send)))
.await
@@ -566,10 +704,27 @@ impl Client {
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
+
+ pub async fn change_avatar(
+ &self,
+ avatar: Option<Vec<u8>>,
+ ) -> Result<(), CommandError<AvatarPublishError<Fs>>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::ChangeAvatar(
+ avatar, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
}
-impl From<Command> for CoreClientCommand<Command> {
- fn from(value: Command) -> Self {
+impl<Fs: FileStore> From<Command<Fs>> for CoreClientCommand<Command<Fs>> {
+ fn from(value: Command<Fs>) -> Self {
CoreClientCommand::Command(value)
}
}
diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs
index df82655..3588b13 100644
--- a/filamento/src/logic/abort.rs
+++ b/filamento/src/logic/abort.rs
@@ -1,7 +1,9 @@
use lampada::error::ReadError;
+use crate::files::FileStore;
+
use super::ClientLogic;
-pub async fn on_abort(logic: ClientLogic) {
+pub async fn on_abort<Fs: FileStore + Clone>(logic: ClientLogic<Fs>) {
logic.pending().drain().await;
}
diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs
index dc05448..9d61ca4 100644
--- a/filamento/src/logic/connect.rs
+++ b/filamento/src/logic/connect.rs
@@ -5,17 +5,21 @@ use tracing::debug;
use crate::{
Command, UpdateMessage,
error::{ConnectionJobError, Error, RosterError},
+ files::FileStore,
presence::{Online, PresenceType},
};
use super::ClientLogic;
-pub async fn handle_connect(logic: ClientLogic, connection: Connected) {
+pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>(
+ logic: ClientLogic<Fs>,
+ connection: Connected,
+) {
let (send, recv) = oneshot::channel();
debug!("getting roster");
logic
.clone()
- .handle_online(Command::GetRoster(send), connection.clone())
+ .handle_online(Command::GetRosterWithUsers(send), connection.clone())
.await;
debug!("sent roster req");
let roster = recv.await;
diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs
index 081900b..36c1cef 100644
--- a/filamento/src/logic/connection_error.rs
+++ b/filamento/src/logic/connection_error.rs
@@ -1,7 +1,12 @@
use lampada::error::ConnectionError;
+use crate::files::FileStore;
+
use super::ClientLogic;
-pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) {
+pub async fn handle_connection_error<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
+ error: ConnectionError,
+) {
logic.handle_error(error.into()).await;
}
diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs
index 241c3e6..ebcfd4f 100644
--- a/filamento/src/logic/disconnect.rs
+++ b/filamento/src/logic/disconnect.rs
@@ -1,11 +1,14 @@
use lampada::Connected;
use stanza::client::Stanza;
-use crate::{UpdateMessage, presence::Offline};
+use crate::{UpdateMessage, files::FileStore, presence::Offline};
use super::ClientLogic;
-pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) {
+pub async fn handle_disconnect<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
+ connection: Connected,
+) {
// TODO: be able to set offline status message
let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None);
let stanza = Stanza::Presence(offline_presence);
diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs
index 3f6fe8d..dc94d2c 100644
--- a/filamento/src/logic/local_only.rs
+++ b/filamento/src/logic/local_only.rs
@@ -4,44 +4,77 @@ use uuid::Uuid;
use crate::{
chat::{Chat, Message},
error::DatabaseError,
+ files::FileStore,
user::User,
};
use super::ClientLogic;
-pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> {
+pub async fn handle_get_chats<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<Chat>, DatabaseError> {
Ok(logic.db().read_chats().await?)
}
-pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> {
+pub async fn handle_get_chats_ordered<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<Chat>, DatabaseError> {
Ok(logic.db().read_chats_ordered().await?)
}
-pub async fn handle_get_chats_ordered_with_latest_messages(
- logic: &ClientLogic,
+pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
) -> Result<Vec<(Chat, Message)>, DatabaseError> {
Ok(logic.db().read_chats_ordered_with_latest_messages().await?)
}
-pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> {
+pub async fn handle_get_chats_ordered_with_latest_messages_and_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<((Chat, User), (Message, User))>, DatabaseError> {
+ Ok(logic
+ .db()
+ .read_chats_ordered_with_latest_messages_and_users()
+ .await?)
+}
+
+pub async fn handle_get_chat<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<Chat, DatabaseError> {
Ok(logic.db().read_chat(jid).await?)
}
-pub async fn handle_get_messages(
- logic: &ClientLogic,
+pub async fn handle_get_messages<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
jid: JID,
) -> Result<Vec<Message>, DatabaseError> {
Ok(logic.db().read_message_history(jid).await?)
}
-pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> {
+pub async fn handle_get_messages_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<Vec<(Message, User)>, DatabaseError> {
+ Ok(logic.db().read_message_history_with_users(jid).await?)
+}
+
+pub async fn handle_delete_chat<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<(), DatabaseError> {
Ok(logic.db().delete_chat(jid).await?)
}
-pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> {
+pub async fn handle_delete_messaage<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ uuid: Uuid,
+) -> Result<(), DatabaseError> {
Ok(logic.db().delete_message(uuid).await?)
}
-pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> {
+pub async fn handle_get_user<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<User, DatabaseError> {
Ok(logic.db().read_user(jid).await?)
}
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs
index 1ddd7d3..5e05dac 100644
--- a/filamento/src/logic/mod.rs
+++ b/filamento/src/logic/mod.rs
@@ -4,12 +4,13 @@ use jid::JID;
use lampada::{Connected, Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
-use tracing::{error, info, warn};
+use tracing::{error, info};
use crate::{
Client, Command, UpdateMessage,
db::Db,
error::{Error, IqRequestError, ResponseError},
+ files::FileStore,
};
mod abort;
@@ -22,12 +23,13 @@ mod online;
mod process_stanza;
#[derive(Clone)]
-pub struct ClientLogic {
- client: Client,
+pub struct ClientLogic<Fs: FileStore> {
+ client: Client<Fs>,
bare_jid: JID,
db: Db,
pending: Pending,
update_sender: mpsc::Sender<UpdateMessage>,
+ file_store: Fs,
}
#[derive(Clone)]
@@ -75,12 +77,13 @@ impl Pending {
}
}
-impl ClientLogic {
+impl<Fs: FileStore> ClientLogic<Fs> {
pub fn new(
- client: Client,
+ client: Client<Fs>,
bare_jid: JID,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
+ file_store: Fs,
) -> Self {
Self {
db,
@@ -88,10 +91,11 @@ impl ClientLogic {
update_sender,
client,
bare_jid,
+ file_store,
}
}
- pub fn client(&self) -> &Client {
+ pub fn client(&self) -> &Client<Fs> {
&self.client
}
@@ -103,6 +107,10 @@ impl ClientLogic {
&self.pending
}
+ pub fn file_store(&self) -> &Fs {
+ &self.file_store
+ }
+
pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {
&self.update_sender
}
@@ -113,13 +121,14 @@ impl ClientLogic {
self.update_sender().send(update).await;
}
- pub async fn handle_error(&self, e: Error) {
+ // TODO: delete this
+ pub async fn handle_error(&self, e: Error<Fs>) {
error!("{}", e);
}
}
-impl Logic for ClientLogic {
- type Cmd = Command;
+impl<Fs: FileStore + Clone + Send + Sync> Logic for ClientLogic<Fs> {
+ type Cmd = Command<Fs>;
// pub async fn handle_stream_error(self, error) {}
// stanza errors (recoverable)
diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs
index 6399cf7..b87484c 100644
--- a/filamento/src/logic/offline.rs
+++ b/filamento/src/logic/offline.rs
@@ -1,16 +1,21 @@
+use std::process::id;
+
use chrono::Utc;
use lampada::error::WriteError;
+use tracing::error;
use uuid::Uuid;
use crate::{
Command,
chat::{Delivery, Message},
error::{
- DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError,
- StatusError,
+ AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError,
+ NickError, PEPError, RosterError, StatusError,
},
+ files::FileStore,
presence::Online,
roster::Contact,
+ user::User,
};
use super::{
@@ -18,11 +23,12 @@ use super::{
local_only::{
handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
+ handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages,
+ handle_get_messages_with_users, handle_get_user,
},
};
-pub async fn handle_offline(logic: ClientLogic, command: Command) {
+pub async fn handle_offline<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>) {
let result = handle_offline_result(&logic, command).await;
match result {
Ok(_) => {}
@@ -30,21 +36,39 @@ pub async fn handle_offline(logic: ClientLogic, command: Command) {
}
}
-pub async fn handle_set_status(logic: &ClientLogic, online: Online) -> Result<(), StatusError> {
+pub async fn handle_set_status<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ online: Online,
+) -> Result<(), StatusError> {
logic.db().upsert_cached_status(online).await?;
Ok(())
}
-pub async fn handle_get_roster(logic: &ClientLogic) -> Result<Vec<Contact>, RosterError> {
+pub async fn handle_get_roster<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<Contact>, RosterError> {
Ok(logic.db().read_cached_roster().await?)
}
-pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Result<(), Error> {
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ Ok(logic.db().read_cached_roster_with_users().await?)
+}
+
+pub async fn handle_offline_result<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ command: Command<Fs>,
+) -> Result<(), Error<Fs>> {
match command {
Command::GetRoster(sender) => {
let roster = handle_get_roster(logic).await;
sender.send(roster);
}
+ Command::GetRosterWithUsers(sender) => {
+ let roster = handle_get_roster_with_users(logic).await;
+ sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
sender.send(chats);
@@ -57,6 +81,10 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chats = handle_get_chat(logic, jid).await;
sender.send(chats);
@@ -65,6 +93,10 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
let messages = handle_get_messages(logic, jid).await;
sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
sender.send(result);
@@ -77,7 +109,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
let user = handle_get_user(logic, jid).await;
sender.send(user);
}
- // TODO: offline queue to modify roster
Command::AddContact(_jid, sender) => {
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
@@ -112,7 +143,6 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
let result = handle_set_status(logic, online).await;
sender.send(result);
}
- // TODO: offline message queue
Command::SendMessage(jid, body) => {
let id = Uuid::new_v4();
let timestamp = Utc::now();
@@ -142,11 +172,25 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
.handle_error(MessageSendError::MessageHistory(e.into()).into())
.await;
}
+
+ let from = match logic.db().read_user(logic.bare_jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.bare_jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ }
+ };
+
logic
.update_sender()
.send(crate::UpdateMessage::Message {
to: jid.as_bare(),
message,
+ from,
})
.await;
}
@@ -159,7 +203,7 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
Command::DiscoItems(_jid, _node, sender) => {
sender.send(Err(DiscoError::Write(WriteError::Disconnected)));
}
- Command::Publish {
+ Command::PublishPEPItem {
item: _,
node: _,
sender,
@@ -169,6 +213,24 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
Command::ChangeNick(_, sender) => {
sender.send(Err(NickError::Disconnected));
}
+ Command::ChangeAvatar(_items, sender) => {
+ sender.send(Err(AvatarPublishError::Disconnected));
+ }
+ Command::DeletePEPNode { node: _, sender } => {
+ sender.send(Err(PEPError::IqResponse(IqRequestError::Write(
+ WriteError::Disconnected,
+ ))));
+ }
+ Command::GetPEPItem {
+ node: _,
+ sender,
+ jid: _,
+ id: _,
+ } => {
+ sender.send(Err(PEPError::IqResponse(IqRequestError::Write(
+ WriteError::Disconnected,
+ ))));
+ }
}
Ok(())
}
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index b069f59..767f923 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -1,35 +1,33 @@
+use std::{io::Cursor, time::Duration};
+
+use base64::{prelude::BASE64_STANDARD, Engine};
use chrono::Utc;
+use image::ImageReader;
use jid::JID;
use lampada::{Connected, WriteMessage, error::WriteError};
+use sha1::{Digest, Sha1};
use stanza::{
client::{
iq::{self, Iq, IqType, Query}, Stanza
- },
- xep_0030::{info, items},
- xep_0060::pubsub::{self, Pubsub},
- xep_0172::{self, Nick},
- xep_0203::Delay,
+ }, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay
};
use tokio::sync::oneshot;
use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
- chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
- DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError
- }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
+ avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
+ AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError
+ }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage
};
use super::{
- ClientLogic,
local_only::{
- handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
- handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
- },
+ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user
+ }, ClientLogic
};
-pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
+pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {
let result = handle_online_result(&logic, command, connection).await;
match result {
Ok(_) => {}
@@ -37,8 +35,8 @@ pub async fn handle_online(logic: ClientLogic, command: Command, connection: Con
}
}
-pub async fn handle_get_roster(
- logic: &ClientLogic,
+pub async fn handle_get_roster<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
) -> Result<Vec<Contact>, RosterError> {
let iq_id = Uuid::new_v4().to_string();
@@ -96,8 +94,73 @@ pub async fn handle_get_roster(
}
}
-pub async fn handle_add_contact(
- logic: &ClientLogic,
+// this can't query the client... otherwise there is a hold-up and the connection can't complete
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ connection: Connected,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, stanza, iq_id.clone())
+ .await?;
+ // TODO: timeout
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .handle_error(Error::Roster(RosterError::Cache(e.into())))
+ .await;
+ };
+ let mut users = Vec::new();
+ for contact in &contacts {
+ let user = logic.db().read_user(contact.user_jid.clone()).await?;
+ users.push(user);
+ }
+ Ok(contacts.into_iter().zip(users).collect())
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_add_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), RosterError> {
@@ -154,8 +217,8 @@ pub async fn handle_add_contact(
}
}
-pub async fn handle_buddy_request(
- logic: &ClientLogic,
+pub async fn handle_buddy_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), SubscribeError> {
@@ -177,8 +240,8 @@ pub async fn handle_buddy_request(
Ok(())
}
-pub async fn handle_subscription_request(
- logic: &ClientLogic,
+pub async fn handle_subscription_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), SubscribeError> {
@@ -195,8 +258,8 @@ pub async fn handle_subscription_request(
Ok(())
}
-pub async fn handle_accept_buddy_request(
- logic: &ClientLogic,
+pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), SubscribeError> {
@@ -218,8 +281,8 @@ pub async fn handle_accept_buddy_request(
Ok(())
}
-pub async fn handle_accept_subscription_request(
- logic: &ClientLogic,
+pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), SubscribeError> {
@@ -274,8 +337,8 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<
Ok(())
}
-pub async fn handle_delete_contact(
- logic: &ClientLogic,
+pub async fn handle_delete_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
) -> Result<(), RosterError> {
@@ -333,8 +396,8 @@ pub async fn handle_delete_contact(
}
}
-pub async fn handle_update_contact(
- logic: &ClientLogic,
+pub async fn handle_update_contact<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: JID,
contact_update: ContactUpdate,
@@ -398,8 +461,8 @@ pub async fn handle_update_contact(
}
}
-pub async fn handle_set_status(
- logic: &ClientLogic,
+pub async fn handle_set_status<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
online: Online,
) -> Result<(), StatusError> {
@@ -411,9 +474,18 @@ pub async fn handle_set_status(
Ok(())
}
-pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) {
+pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) {
// upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default.
- let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
+ // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
+ let have_chatted = match logic.db().upsert_chat_and_user(&jid).await {
+ Ok(have_chatted) => {
+ have_chatted
+ },
+ Err(e) => {
+ error!("{}", e);
+ false
+ },
+ };
let nick;
let mark_chat_as_chatted;
@@ -460,12 +532,25 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
.await;
}
+ let from = match logic.db().read_user(logic.bare_jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.bare_jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ },
+ };
+
// tell the client a message is being sent
logic
.update_sender()
.send(UpdateMessage::Message {
to: jid.as_bare(),
message,
+ from,
})
.await;
@@ -503,9 +588,11 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Written,
+ chat: jid.clone(),
})
.await;
if mark_chat_as_chatted {
+ debug!("marking chat as chatted");
if let Err(e) = logic.db.mark_chat_as_chatted(jid).await {
logic
.handle_error(MessageSendError::MarkChatAsChatted(e.into()).into())
@@ -519,6 +606,7 @@ pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Failed,
+ chat: jid,
})
.await;
logic.handle_error(MessageSendError::Write(e).into()).await;
@@ -540,8 +628,8 @@ pub async fn handle_send_presence(
Ok(())
}
-pub async fn handle_disco_info(
- logic: &ClientLogic,
+pub async fn handle_disco_info<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: Option<JID>,
node: Option<String>,
@@ -611,8 +699,8 @@ pub async fn handle_disco_info(
}
}
-pub async fn handle_disco_items(
- logic: &ClientLogic,
+pub async fn handle_disco_items<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
jid: Option<JID>,
node: Option<String>,
@@ -680,20 +768,60 @@ pub async fn handle_disco_items(
}
}
-pub async fn handle_publish(
- logic: &ClientLogic,
+pub async fn handle_publish_pep_item<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
connection: Connected,
item: pep::Item,
node: String,
-) -> Result<(), PublishError> {
+) -> Result<(), PEPError> {
let id = Uuid::new_v4().to_string();
let publish = match item {
- pep::Item::Nick(n) => pubsub::Publish {
- node,
- items: vec![pubsub::Item {
- item: Some(pubsub::Content::Nick(Nick(n))),
- ..Default::default()
- }],
+ pep::Item::Nick(n) => {
+ if let Some(n) = n {
+ pubsub::Publish {
+ node,
+ items: vec![pubsub::Item {
+ item: Some(pubsub::Content::Nick(Nick(n))),
+ ..Default::default()
+ }],
+ }
+ } else {
+ pubsub::Publish {
+ node,
+ items: vec![pubsub::Item {
+ item: Some(pubsub::Content::Nick(Nick("".to_string()))),
+ ..Default::default()
+ }]
+ }
+ }
+ },
+ pep::Item::AvatarMetadata(metadata) => {
+ if let Some(metadata) = metadata {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: vec![xep_0084::Info { bytes: metadata.bytes, height: None, id: metadata.hash.clone(), r#type: metadata.r#type, url: None, width: None }], pointers: Vec::new() })),
+ id: Some(metadata.hash),
+ ..Default::default()
+ }]}
+ } else {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarMetadata(xep_0084::Metadata { info: Vec::new(), pointers: Vec::new() })),
+ ..Default::default()
+ }]}
+ }
+ },
+ pep::Item::AvatarData(data) => {
+ if let Some(data) = data {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarData(xep_0084::Data(data.data_b64))),
+ id: Some(data.hash),
+ ..Default::default()
+ }] }
+ } else {
+ pubsub::Publish { node, items: vec![pubsub::Item {
+ item: Some(pubsub::Content::AvatarData(xep_0084::Data("".to_string()))),
+ ..Default::default()
+ }]}
+ }
},
};
let request = Iq {
@@ -726,43 +854,239 @@ pub async fn handle_publish(
if let Some(query) = query {
match query {
Query::Pubsub(_) => Ok(()),
- q => Err(PublishError::MismatchedQuery(q)),
+ q => Err(PEPError::MismatchedQuery(q)),
+ }
+ } else {
+ Err(PEPError::MissingQuery)
+ }
+ }
+ IqType::Error => {
+ Err(PEPError::StanzaErrors(errors))
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ Err(PEPError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().as_bare()),
+ ))
+ }
+ }
+ s => Err(PEPError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> {
+ let stanza_id = Uuid::new_v4().to_string();
+ let request = Iq {
+ from: Some(connection.jid().clone()),
+ id: stanza_id.clone(),
+ to: jid.clone(),
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(Query::Pubsub(Pubsub::Items(pubsub::Items {
+ max_items: None,
+ node,
+ subid: None,
+ items: vec![pubsub::Item { id: Some(id.clone()), publisher: None, item: None }],
+ }))),
+ errors: Vec::new(),
+ };
+ match logic
+ .pending()
+ .request(&connection, Stanza::Iq(request), stanza_id)
+ .await? {
+
+ Stanza::Iq(Iq {
+ from,
+ r#type,
+ query,
+ errors,
+ ..
+ // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
+ }) if r#type == IqType::Result || r#type == IqType::Error => {
+ if from == jid || {
+ if jid == None {
+ from == Some(connection.jid().as_bare())
+ } else {
+ false
+ }
+ } {
+ match r#type {
+ IqType::Result => {
+ if let Some(query) = query {
+ match query {
+ Query::Pubsub(Pubsub::Items(mut items)) => {
+ if let Some(item) = items.items.pop() {
+ if item.id == Some(id.clone()) {
+ match item.item.ok_or(PEPError::MissingItem)? {
+ pubsub::Content::Nick(nick) => {
+ if nick.0.is_empty() {
+ Ok(pep::Item::Nick(None))
+ } else {
+ Ok(pep::Item::Nick(Some(nick.0)))
+
+ }
+ },
+ pubsub::Content::AvatarData(data) => Ok(pep::Item::AvatarData(Some(avatar::Data { hash: id, data_b64: data.0 }))),
+ pubsub::Content::AvatarMetadata(metadata) => Ok(pep::Item::AvatarMetadata(metadata.info.into_iter().find(|info| info.url.is_none()).map(|info| info.into()))),
+ pubsub::Content::Unknown(_element) => Err(PEPError::UnsupportedItem),
+ }
+ } else {
+ Err(PEPError::IncorrectItemID(id, item.id.unwrap_or_else(|| "missing id".to_string())))
+ }
+ } else {
+ Err(PEPError::MissingItem)
+ }
+ },
+ q => Err(PEPError::MismatchedQuery(q)),
}
} else {
- Err(PublishError::MissingQuery)
+ Err(PEPError::MissingQuery)
}
}
IqType::Error => {
- Err(PublishError::StanzaErrors(errors))
+ Err(PEPError::StanzaErrors(errors))
}
_ => unreachable!(),
}
} else {
- Err(PublishError::IncorrectEntity(
+ // TODO: include expected entity
+ Err(PEPError::IncorrectEntity(
from.unwrap_or_else(|| connection.jid().as_bare()),
))
}
}
- s => Err(PublishError::UnexpectedStanza(s)),
+ s => Err(PEPError::UnexpectedStanza(s)),
}
}
-pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> {
+pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, nick: Option<String>) -> Result<(), NickError> {
logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?;
Ok(())
}
+pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> {
+ match img_data {
+ // set avatar
+ Some(data) => {
+ // load the image data and guess the format
+ let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?;
+
+ // convert the image to png;
+ let mut data_png = Vec::new();
+ let image = image.resize(192, 192, image::imageops::FilterType::Nearest);
+ image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Jpeg)?;
+
+ // calculate the length of the data in bytes.
+ let bytes = data_png.len().try_into()?;
+
+ // calculate sha1 hash of the data
+ let mut sha1 = Sha1::new();
+ sha1.update(&data_png);
+ let sha1_result = sha1.finalize();
+ let hash = hex::encode(sha1_result);
+
+ // encode the image data as base64
+ let data_b64 = BASE64_STANDARD.encode(data_png.clone());
+
+ // publish the data to the data node
+ logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?;
+
+ // publish the metadata to the metadata node
+ logic.client().publish(pep::Item::AvatarMetadata(Some(avatar::Metadata { bytes, hash: hash.clone(), r#type: "image/jpeg".to_string() })), "urn:xmpp:avatar:metadata".to_string()).await?;
+
+ // if everything went well, save the data to the disk.
+
+ if !logic.file_store().is_stored(&hash).await.map_err(|err| AvatarPublishError::FileStore(err))? {
+ logic.file_store().store(&hash, &data_png).await.map_err(|err| AvatarPublishError::FileStore(err))?
+ }
+ // when the client receives the updated metadata notification from the pep node, it will already have it saved on the disk so will not require a retrieval.
+ // TODO: should the node be purged?
+
+ Ok(())
+ },
+ // remove avatar
+ None => {
+ logic.client().delete_pep_node("urn:xmpp:avatar:data".to_string()).await?;
+ logic.client().publish(pep::Item::AvatarMetadata(None), "urn:xmpp:avatar:metadata".to_string(), ).await?;
+ Ok(())
+ },
+ }
+}
+
+pub async fn handle_delete_pep_node<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ connection: Connected,
+ node: String,
+) -> Result<(), PEPError> {
+ let id = Uuid::new_v4().to_string();
+ let request = Iq {
+ from: Some(connection.jid().clone()),
+ id: id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(Query::PubsubOwner(xep_0060::owner::Pubsub::Delete(owner::Delete{ node, redirect: None }))),
+ errors: Vec::new(),
+ };
+ match logic
+ .pending()
+ .request(&connection, Stanza::Iq(request), id)
+ .await? {
+
+ Stanza::Iq(Iq {
+ from,
+ r#type,
+ query,
+ errors,
+ ..
+ // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
+ }) if r#type == IqType::Result || r#type == IqType::Error => {
+ if from == None ||
+ from == Some(connection.jid().as_bare())
+ {
+ match r#type {
+ IqType::Result => {
+ if let Some(query) = query {
+ match query {
+ Query::PubsubOwner(_) => Ok(()),
+ q => Err(PEPError::MismatchedQuery(q)),
+ }
+ } else {
+ // Err(PEPError::MissingQuery)
+ Ok(())
+ }
+ }
+ IqType::Error => {
+ Err(PEPError::StanzaErrors(errors))
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ Err(PEPError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().as_bare()),
+ ))
+ }
+ }
+ s => Err(PEPError::UnexpectedStanza(s)),
+ }
+}
+
// TODO: could probably macro-ise?
-pub async fn handle_online_result(
- logic: &ClientLogic,
- command: Command,
+pub async fn handle_online_result<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ command: Command<Fs>,
connection: Connected,
-) -> Result<(), Error> {
+) -> Result<(), Error<Fs>> {
match command {
Command::GetRoster(result_sender) => {
let roster = handle_get_roster(logic, connection).await;
let _ = result_sender.send(roster);
}
+ Command::GetRosterWithUsers(result_sender) => {
+ let roster = handle_get_roster_with_users(logic, connection).await;
+ let _ = result_sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
let _ = sender.send(chats);
@@ -775,6 +1099,10 @@ pub async fn handle_online_result(
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
let _ = sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chat = handle_get_chat(logic, jid).await;
let _ = sender.send(chat);
@@ -783,6 +1111,10 @@ pub async fn handle_online_result(
let messages = handle_get_messages(logic, jid).await;
let _ = sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
let _ = sender.send(result);
@@ -854,14 +1186,26 @@ pub async fn handle_online_result(
let result = handle_disco_items(logic, connection, jid, node).await;
let _ = sender.send(result);
}
- Command::Publish { item, node, sender } => {
- let result = handle_publish(logic, connection, item, node).await;
+ Command::PublishPEPItem { item, node, sender } => {
+ let result = handle_publish_pep_item(logic, connection, item, node).await;
let _ = sender.send(result);
}
Command::ChangeNick(nick, sender) => {
let result = handle_change_nick(logic, nick).await;
let _ = sender.send(result);
}
+ Command::ChangeAvatar(img_data, sender) => {
+ let result = handle_change_avatar(logic, img_data).await;
+ let _ = sender.send(result);
+ },
+ Command::DeletePEPNode { node, sender } => {
+ let result = handle_delete_pep_node(logic, connection, node).await;
+ let _ = sender.send(result);
+ },
+ Command::GetPEPItem { node, sender, jid, id } => {
+ let result = handle_get_pep_item(logic, connection, jid, node, id).await;
+ let _ = sender.send(result);
+ },
}
Ok(())
}
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 11d7588..cdaff97 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -1,7 +1,9 @@
use std::str::FromStr;
+use base64::{Engine, prelude::BASE64_STANDARD};
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
+use sha1::{Digest, Sha1};
use stanza::{
client::{
Stanza,
@@ -17,14 +19,23 @@ use uuid::Uuid;
use crate::{
UpdateMessage, caps,
chat::{Body, Message},
- error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError},
+ error::{
+ AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError,
+ PresenceError, RosterError,
+ },
+ files::FileStore,
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
+ user::User,
};
use super::ClientLogic;
-pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) {
+pub async fn handle_stanza<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
+ stanza: Stanza,
+ connection: Connected,
+) {
let result = process_stanza(logic.clone(), stanza, connection).await;
match result {
Ok(u) => match u {
@@ -38,10 +49,10 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne
}
}
-pub async fn recv_message(
- logic: ClientLogic,
+pub async fn recv_message<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
stanza_message: stanza::client::message::Message,
-) -> Result<Option<UpdateMessage>, MessageRecvError> {
+) -> Result<Option<UpdateMessage>, MessageRecvError<Fs>> {
if let Some(from) = stanza_message.from {
// TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
let timestamp = stanza_message
@@ -50,6 +61,7 @@ pub async fn recv_message(
.unwrap_or_else(|| Utc::now());
// TODO: group chat messages
+ // body MUST be before user changes in order to avoid race condition where you e.g. get a nick update before the user is in the client state.
// if there is a body, should create chat message
if let Some(body) = stanza_message.body {
let message = Message {
@@ -67,92 +79,391 @@ pub async fn recv_message(
};
// save the message to the database
- logic.db().upsert_chat_and_user(&from).await?;
- if let Err(e) = logic
- .db()
- .create_message_with_user_resource(message.clone(), from.clone(), from.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
- .await;
- }
+ match logic.db().upsert_chat_and_user(&from).await {
+ Ok(_) => {
+ if let Err(e) = logic
+ .db()
+ .create_message_with_user_resource(
+ message.clone(),
+ from.clone(),
+ from.clone(),
+ )
+ .await
+ {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ error!("failed to upsert chat and user")
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ error!("failed to upsert chat and user")
+ }
+ };
+
+ let from_user = match logic.db().read_user(from.as_bare()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: from.as_bare(),
+ nick: None,
+ avatar: None,
+ }
+ }
+ };
// update the client with the new message
logic
.update_sender()
.send(UpdateMessage::Message {
to: from.as_bare(),
+ from: from_user,
message,
})
.await;
}
if let Some(nick) = stanza_message.nick {
- if let Err(e) = logic
- .db()
- .upsert_user_nick(from.as_bare(), nick.0.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
- .await;
+ let nick = nick.0;
+ if nick.is_empty() {
+ match logic.db().delete_user_nick(from.as_bare()).await {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ } else {
+ match logic
+ .db()
+ .upsert_user_nick(from.as_bare(), nick.clone())
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
}
-
- logic
- .update_sender()
- .send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
- nick: nick.0,
- })
- .await;
}
if let Some(event) = stanza_message.event {
match event {
- Event::Items(items) => match items.node.as_str() {
- "http://jabber.org/protocol/nick" => match items.items {
- ItemsType::Item(items) => {
- if let Some(item) = items.first() {
- match &item.item {
- Some(c) => match c {
- Content::Nick(nick) => {
- if let Err(e) = logic
- .db()
- .upsert_user_nick(from.as_bare(), nick.0.clone())
- .await
- {
- logic
- .handle_error(Error::MessageRecv(
- MessageRecvError::NickUpdate(e),
- ))
- .await;
+ Event::Items(items) => {
+ match items.node.as_str() {
+ "http://jabber.org/protocol/nick" => match items.items {
+ ItemsType::Item(items) => {
+ if let Some(item) = items.first() {
+ match &item.item {
+ Some(c) => match c {
+ Content::Nick(nick) => {
+ let nick = nick.0.clone();
+ if nick.is_empty() {
+ match logic
+ .db()
+ .delete_user_nick(from.as_bare())
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::NickUpdate(e),
+ ))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: None,
+ })
+ .await;
+ }
+ }
+ } else {
+ match logic
+ .db()
+ .upsert_user_nick(
+ from.as_bare(),
+ nick.clone(),
+ )
+ .await
+ {
+ Ok(changed) => {
+ if changed {
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::NickUpdate(e),
+ ))
+ .await;
+ // if failed, send user update anyway
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: Some(nick),
+ })
+ .await;
+ }
+ }
+ }
}
+ _ => {}
+ },
+ None => {}
+ }
+ }
+ }
+ _ => {}
+ },
+ "urn:xmpp:avatar:metadata" => {
+ match items.items {
+ ItemsType::Item(items) => {
+ if let Some(item) = items.first() {
+ debug!("found item");
+ match &item.item {
+ Some(Content::AvatarMetadata(metadata)) => {
+ debug!("found metadata");
+ // check if user avatar has been deleted
+ if let Some(metadata) = metadata
+ .info
+ .iter()
+ .find(|info| info.url.is_none())
+ {
+ debug!("checking if user avatar has changed");
+ // check if user avatar has changed
+ match logic
+ .db()
+ .upsert_user_avatar(
+ from.as_bare(),
+ metadata.id.clone(),
+ )
+ .await
+ {
+ Ok((changed, old_avatar)) => {
+ if changed {
+ if let Some(old_avatar) = old_avatar
+ {
+ if let Err(e) = logic
+ .file_store()
+ .delete(&old_avatar)
+ .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
+ logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
+ }
+ }
+ }
- logic
- .update_sender()
- .send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
- nick: nick.0.clone(),
- })
- .await;
+ match logic
+ .file_store()
+ .is_stored(&metadata.id)
+ .await
+ .map_err(|err| {
+ AvatarUpdateError::<Fs>::FileStore(
+ err,
+ )
+ }) {
+ Ok(false) => {
+ // get data
+ let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?;
+ match pep_item {
+ crate::pep::Item::AvatarData(data) => {
+ let data = data.map(|data| data.data_b64).unwrap_or_default().replace("\n", "");
+ // TODO: these should all be in a separate avatarupdate function
+ debug!("got avatar data");
+ match BASE64_STANDARD.decode(data) {
+ Ok(data) => {
+ let mut hasher = Sha1::new();
+ hasher.update(&data);
+ let received_data_hash = hex::encode(hasher.finalize());
+ debug!("received_data_hash: {}, metadata_id: {}", received_data_hash, metadata.id);
+ if received_data_hash.to_lowercase() == metadata.id.to_lowercase() {
+ if let Err(e) = logic.file_store().store(&received_data_hash, &data).await {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::FileStore(e)))).await;
+ }
+ if changed {
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: Some(
+ metadata.id.clone(),
+ ),
+ },
+ )
+ .await;
+ }
+ }
+ },
+ Err(e) => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::Base64(e)))).await;
+ },
+ }
+ },
+ _ => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(AvatarUpdateError::MissingData))).await;
+ }
+ }
+ }
+ Ok(true) => {
+ // just send the update
+ if changed {
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: Some(
+ metadata.id.clone(),
+ ),
+ },
+ )
+ .await;
+ }
+ }
+ Err(e) => {
+ logic.handle_error(Error::MessageRecv(MessageRecvError::AvatarUpdate(e))).await;
+ }
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::AvatarUpdate(
+ AvatarUpdateError::Database(
+ e,
+ ),
+ ),
+ ))
+ .await;
+ }
+ }
+ } else {
+ // delete avatar
+ match logic
+ .db()
+ .delete_user_avatar(from.as_bare())
+ .await
+ {
+ Ok((changed, old_avatar)) => {
+ if changed {
+ if let Some(old_avatar) = old_avatar
+ {
+ if let Err(e) = logic
+ .file_store()
+ .delete(&old_avatar)
+ .await.map_err(|err| AvatarUpdateError::FileStore(err)) {
+ logic.handle_error(MessageRecvError::AvatarUpdate(e).into()).await;
+ }
+ }
+ logic
+ .update_sender()
+ .send(
+ UpdateMessage::AvatarChanged {
+ jid: from.as_bare(),
+ id: None,
+ },
+ )
+ .await;
+ }
+ }
+ Err(e) => {
+ logic
+ .handle_error(Error::MessageRecv(
+ MessageRecvError::AvatarUpdate(
+ AvatarUpdateError::Database(
+ e,
+ ),
+ ),
+ ))
+ .await;
+ }
+ }
+ }
+ // check if the new file is in the file store
+ // if not, retrieve from server and save in the file store (remember to check if the hash matches)
+ // send the avatar update
+ }
+ _ => {}
}
- Content::Unknown(element) => {}
- },
- None => {}
+ }
}
+ _ => {}
}
}
- ItemsType::Retract(retracts) => {}
- },
- _ => {}
- },
+ _ => {}
+ }
+ }
// Event::Collection(collection) => todo!(),
// Event::Configuration(configuration) => todo!(),
// Event::Delete(delete) => todo!(),
// Event::Purge(purge) => todo!(),
// Event::Subscription(subscription) => todo!(),
- _ => {}
+ _ => {} // TODO: catch these catch-alls in some way
}
}
@@ -240,15 +551,15 @@ pub async fn recv_presence(
}
}
-pub async fn recv_iq(
- logic: ClientLogic,
+pub async fn recv_iq<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
connection: Connected,
iq: Iq,
-) -> Result<Option<UpdateMessage>, IqError> {
+) -> Result<Option<UpdateMessage>, IqProcessError> {
if let Some(to) = &iq.to {
if *to == *connection.jid() {
} else {
- return Err(IqError::IncorrectAddressee(to.clone()));
+ return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone())));
}
}
match iq.r#type {
@@ -259,7 +570,11 @@ pub async fn recv_iq(
.unwrap_or_else(|| connection.server().clone());
let id = iq.id.clone();
debug!("received iq result with id `{}` from {}", id, from);
- logic.pending().respond(Stanza::Iq(iq), id).await?;
+ logic
+ .pending()
+ .respond(Stanza::Iq(iq), id)
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
stanza::client::iq::IqType::Get => {
@@ -299,7 +614,11 @@ pub async fn recv_iq(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
@@ -315,7 +634,11 @@ pub async fn recv_iq(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
@@ -330,7 +653,11 @@ pub async fn recv_iq(
query: Some(iq::Query::DiscoInfo(disco)),
errors: vec![],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
Ok(None)
}
@@ -345,7 +672,11 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq get from {}", from);
Ok(None)
} // stanza::client::iq::Query::Bind(bind) => todo!(),
@@ -365,7 +696,11 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::BadRequest.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to malformed iq query from {}", from);
Ok(None)
}
@@ -416,7 +751,12 @@ pub async fn recv_iq(
.handle_error(RosterError::PushReply(e.into()).into())
.await;
}
- Ok(Some(UpdateMessage::RosterUpdate(contact)))
+ let user = logic
+ .db()
+ .read_user(contact.user_jid.clone())
+ .await
+ .map_err(|e| Into::<RosterError>::into(e))?;
+ Ok(Some(UpdateMessage::RosterUpdate(contact, user)))
}
}
} else {
@@ -430,7 +770,11 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
@@ -446,7 +790,11 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq set from {}", from);
Ok(None)
}
@@ -462,18 +810,22 @@ pub async fn recv_iq(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
}
}
-pub async fn process_stanza(
- logic: ClientLogic,
+pub async fn process_stanza<Fs: FileStore + Clone>(
+ logic: ClientLogic<Fs>,
stanza: Stanza,
connection: Connected,
-) -> Result<Option<UpdateMessage>, Error> {
+) -> Result<Option<UpdateMessage>, Error<Fs>> {
let update = match stanza {
Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
Stanza::Presence(presence) => Ok(recv_presence(presence).await?),
diff --git a/filamento/src/pep.rs b/filamento/src/pep.rs
index c71d843..3cd243f 100644
--- a/filamento/src/pep.rs
+++ b/filamento/src/pep.rs
@@ -1,17 +1,8 @@
-// in commandmessage
-// pub struct Publish {
-// item: Item,
-// node: Option<String>,
-// // no need for node, as item has the node
-// }
-//
-// in updatemessage
-// pub struct Event {
-// from: JID,
-// item: Item,
-// }
+use crate::avatar::{Data as AvatarData, Metadata as AvatarMetadata};
#[derive(Clone, Debug)]
pub enum Item {
- Nick(String),
+ Nick(Option<String>),
+ AvatarMetadata(Option<AvatarMetadata>),
+ AvatarData(Option<AvatarData>),
}
diff --git a/filamento/src/user.rs b/filamento/src/user.rs
index 85471d5..8669fc3 100644
--- a/filamento/src/user.rs
+++ b/filamento/src/user.rs
@@ -4,5 +4,6 @@ use jid::JID;
pub struct User {
pub jid: JID,
pub nick: Option<String>,
- pub cached_status_message: Option<String>,
+ pub avatar: Option<String>,
+ // pub cached_status_message: Option<String>,
}