aboutsummaryrefslogtreecommitdiffstats
path: root/filamento
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 14:29:40 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 14:29:40 +0000
commit2211f324782cdc617b4b5ecd071178e372539fe4 (patch)
treea5ea5ce11d748424447dee23173d3cb8aec648ea /filamento
parent2f8671978e18c1e1e7834056ae674f32fbde3868 (diff)
downloadluz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.gz
luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.bz2
luz-2211f324782cdc617b4b5ecd071178e372539fe4.zip
refactor: rename crates and move client logic to separate crate `filament`
Diffstat (limited to 'filamento')
-rw-r--r--filamento/Cargo.toml17
-rw-r--r--filamento/README.md3
-rw-r--r--filamento/filamento.dbbin0 -> 90112 bytes
-rw-r--r--filamento/migrations/20240113011930_luz.sql119
-rw-r--r--filamento/src/chat.rs57
-rw-r--r--filamento/src/db.rs521
-rw-r--r--filamento/src/error.rs142
-rw-r--r--filamento/src/lib.rs1598
-rw-r--r--filamento/src/presence.rs151
-rw-r--r--filamento/src/roster.rs127
-rw-r--r--filamento/src/user.rs7
11 files changed, 2742 insertions, 0 deletions
diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml
new file mode 100644
index 0000000..e25024a
--- /dev/null
+++ b/filamento/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "filamento"
+version = "0.1.0"
+edition = "2024"
+
+[dependencies]
+futures = "0.3.31"
+lampada = { version = "0.1.0", path = "../lampada" }
+tokio = "1.42.0"
+thiserror = "2.0.11"
+stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] }
+sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
+# TODO: re-export jid?
+jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
+uuid = { version = "1.13.1", features = ["v4"] }
+tracing = "0.1.41"
+chrono = "0.4.40"
diff --git a/filamento/README.md b/filamento/README.md
new file mode 100644
index 0000000..57b4135
--- /dev/null
+++ b/filamento/README.md
@@ -0,0 +1,3 @@
+# filament
+
+a high-level xmpp chat client using luz
diff --git a/filamento/filamento.db b/filamento/filamento.db
new file mode 100644
index 0000000..5c3c720
--- /dev/null
+++ b/filamento/filamento.db
Binary files differ
diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql
new file mode 100644
index 0000000..148598b
--- /dev/null
+++ b/filamento/migrations/20240113011930_luz.sql
@@ -0,0 +1,119 @@
+PRAGMA foreign_keys = on;
+
+-- a user jid will never change, only a chat user will change
+-- TODO: avatar, nick, etc.
+create table users(
+ -- TODO: enforce bare jid
+ jid text primary key not null,
+ -- can receive presence status from non-contacts
+ cached_status_message text
+ -- TODO: last_seen
+);
+
+-- -- links to messages, jabber users, stores jid history, etc.
+-- create table identities(
+-- id text primary key not null
+-- );
+
+-- create table identities_users(
+-- id text not null,
+-- jid text not null,
+-- -- whichever has the newest timestamp is the active one.
+-- -- what to do when somebody moves, but then the old jid is used again without having explicitly moved back? create new identity to assign ownership to?
+-- -- merging of identities?
+-- activated_timestamp not null,
+-- foreign key(id) references identities(id),
+-- foreign key(jid) references users(jid),
+-- primary key(activated timestamp, id, jid)
+-- );
+
+create table resources(
+ bare_jid text not null,
+ resource text not null,
+ foreign key(bare_jid) references users(jid),
+ primary key(bare_jid, resource)
+);
+
+-- enum for subscription state
+create table subscription(
+ state text primary key not null
+);
+
+insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy');
+
+-- a roster contains users, with client-set nickname
+CREATE TABLE roster(
+ user_jid text primary key not null,
+ name TEXT,
+ subscription text not null,
+ foreign key(subscription) references subscription(state),
+ foreign key(user_jid) references users(jid)
+);
+
+create table groups(
+ group_name text primary key not null
+);
+
+create table groups_roster(
+ group_name text not null,
+ contact_jid text not null,
+ foreign key(group_name) references groups(group_name),
+ foreign key(contact_jid) references roster(user_jid) on delete cascade,
+ primary key(group_name, contact_jid)
+);
+
+-- chat includes reference to user jid chat is with
+-- specifically for dms, groups should be different
+-- can send chat message to user (creating a new chat if not already exists)
+create table chats (
+ id text primary key not null,
+ correspondent text not null unique,
+ foreign key(correspondent) references users(jid)
+);
+
+-- messages include reference to chat they are in, and who sent them.
+create table messages (
+ id text primary key not null,
+ body text,
+ chat_id text not null,
+ -- TODO: channel stuff
+ -- channel_id uuid,
+ -- check ((chat_id == null) <> (channel_id == null)),
+ -- check ((chat_id == null) or (channel_id == null)),
+ -- user is the current "owner" of the message
+ -- TODO: queued messages offline
+ -- TODO: timestamp
+ timestamp text not null,
+
+ -- TODO: icky
+ -- the user to show it coming from (not necessarily the original sender)
+ -- from_identity text not null,
+ -- original sender details (only from jabber supported for now)
+ from_jid text not null,
+ -- resource can be null
+ from_resource text,
+ -- check (from_jid != original_sender),
+
+ -- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user)
+ -- TODO: read bool not null,
+ foreign key(chat_id) references chats(id) on delete cascade,
+ -- foreign key(from_identity) references identities(id),
+ foreign key(from_jid) references users(jid),
+ foreign key(from_jid, from_resource) references resources(bare_jid, resource)
+);
+
+-- enum for subscription state
+create table show (
+ state text primary key not null
+);
+
+insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away');
+
+create table cached_status (
+ id integer primary key not null,
+ show text,
+ message text,
+ foreign key(show) references show(state)
+);
+
+insert into cached_status (id) values (0);
diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs
new file mode 100644
index 0000000..c1194ea
--- /dev/null
+++ b/filamento/src/chat.rs
@@ -0,0 +1,57 @@
+use chrono::{DateTime, Utc};
+use jid::JID;
+use uuid::Uuid;
+
+#[derive(Debug, sqlx::FromRow, Clone)]
+pub struct Message {
+ pub id: Uuid,
+ // does not contain full user information
+ #[sqlx(rename = "from_jid")]
+ pub from: JID,
+ pub timestamp: DateTime<Utc>,
+ // TODO: originally_from
+ // TODO: message edits
+ // TODO: message timestamp
+ #[sqlx(flatten)]
+ pub body: Body,
+}
+
+// TODO: user migrations
+// pub enum Migrated {
+// Jabber(User),
+// Outside,
+// }
+
+#[derive(Debug, sqlx::FromRow, Clone)]
+pub struct Body {
+ // TODO: rich text, other contents, threads
+ pub body: String,
+}
+
+#[derive(sqlx::FromRow, Debug, Clone)]
+pub struct Chat {
+ pub correspondent: JID,
+ // pub unread_messages: i32,
+ // pub latest_message: Message,
+ // when a new message is received, the chat should be updated, and the new message should be delivered too.
+ // message history is not stored in chat, retreived separately.
+ // pub message_history: Vec<Message>,
+}
+
+pub enum ChatUpdate {}
+
+impl Chat {
+ pub fn new(correspondent: JID) -> Self {
+ Self { correspondent }
+ }
+ pub fn correspondent(&self) -> &JID {
+ &self.correspondent
+ }
+}
+
+// TODO: group chats
+// pub enum Chat {
+// Direct(DirectChat),
+// Channel(Channel),
+// }
+// pub struct Channel {}
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
new file mode 100644
index 0000000..aea40ac
--- /dev/null
+++ b/filamento/src/db.rs
@@ -0,0 +1,521 @@
+use std::{collections::HashSet, path::Path};
+
+use jid::JID;
+use sqlx::{migrate, Error, SqlitePool};
+use uuid::Uuid;
+
+use crate::{
+ chat::{Chat, Message},
+ error::{DatabaseError, DatabaseOpenError},
+ presence::Online,
+ roster::Contact,
+ user::User,
+};
+
+#[derive(Clone)]
+pub struct Db {
+ db: SqlitePool,
+}
+
+// TODO: turn into trait
+impl Db {
+ pub async fn create_connect_and_migrate(
+ path: impl AsRef<Path>,
+ ) -> Result<Self, DatabaseOpenError> {
+ if let Some(dir) = path.as_ref().parent() {
+ if dir.is_dir() {
+ } else {
+ tokio::fs::create_dir_all(dir).await?;
+ }
+ let _file = tokio::fs::OpenOptions::new()
+ .append(true)
+ .create(true)
+ .open(path.as_ref())
+ .await?;
+ }
+ let url = format!(
+ "sqlite://{}",
+ path.as_ref()
+ .to_str()
+ .ok_or(DatabaseOpenError::InvalidPath)?
+ );
+ let db = SqlitePool::connect(&url).await?;
+ migrate!().run(&db).await?;
+ Ok(Self { db })
+ }
+
+ pub(crate) fn new(db: SqlitePool) -> Self {
+ Self { db }
+ }
+
+ pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into users ( jid, cached_status_message ) values ( ?, ? )",
+ user.jid,
+ user.cached_status_message
+ )
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
+ let user: User = sqlx::query_as("select * from users where jid = ?")
+ .bind(user)
+ .fetch_one(&self.db)
+ .await?;
+ Ok(user)
+ }
+
+ pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
+ sqlx::query!(
+ "update users set cached_status_message = ? where jid = ?",
+ user.cached_status_message,
+ user.jid
+ )
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
+ // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {}
+
+ /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
+ pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )",
+ contact.user_jid,
+ contact.name,
+ contact.subscription
+ )
+ .execute(&self.db)
+ .await?;
+ // TODO: abstract this out in to add_to_group() function ?
+ for group in contact.groups {
+ sqlx::query!(
+ "insert into groups (group_name) values (?) on conflict do nothing",
+ group
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into groups_roster (group_name, contact_jid) values (?, ?)",
+ group,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
+ let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?")
+ .bind(contact)
+ .fetch_one(&self.db)
+ .await?;
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ Ok(contact)
+ }
+
+ pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
+ let contact: Option<Contact> =
+ sqlx::query_as("select * from roster join users on jid = user_jid where jid = ?")
+ .bind(contact)
+ .fetch_optional(&self.db)
+ .await?;
+ if let Some(mut contact) = contact {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ Ok(Some(contact))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// does not update the underlying user, to update user, update_user() must be called separately
+ pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
+ sqlx::query!(
+ "update roster set name = ?, subscription = ? where user_jid = ?",
+ contact.name,
+ contact.subscription,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "delete from groups_roster where contact_jid = ?",
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ // TODO: delete orphaned groups from groups table
+ for group in contact.groups {
+ sqlx::query!(
+ "insert into groups (group_name) values (?) on conflict do nothing",
+ group
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into groups_roster (group_name, contact_jid) values (?, ?)",
+ group,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into users ( jid ) values ( ? ) on conflict do nothing",
+ contact.user_jid,
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?",
+ contact.user_jid,
+ contact.name,
+ contact.subscription,
+ contact.name,
+ contact.subscription
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "delete from groups_roster where contact_jid = ?",
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ // TODO: delete orphaned groups from groups table
+ for group in contact.groups {
+ sqlx::query!(
+ "insert into groups (group_name) values (?) on conflict do nothing",
+ group
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into groups_roster (group_name, contact_jid) values (?, ?)",
+ group,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
+ sqlx::query!("delete from roster where user_jid = ?", contact)
+ .execute(&self.db)
+ .await?;
+ // TODO: delete orphaned groups from groups table
+ Ok(())
+ }
+
+ pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
+ sqlx::query!("delete from roster").execute(&self.db).await?;
+ for contact in roster {
+ self.upsert_contact(contact).await?;
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
+ let mut roster: Vec<Contact> =
+ sqlx::query_as("select * from roster join users on jid = user_jid")
+ .fetch_all(&self.db)
+ .await?;
+ for contact in &mut roster {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ }
+ Ok(roster)
+ }
+
+ pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
+ let id = Uuid::new_v4();
+ let jid = chat.correspondent();
+ sqlx::query!(
+ "insert into chats (id, correspondent) values (?, ?)",
+ id,
+ jid
+ )
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.
+
+ pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
+ // check if the chat correponding with the jid exists
+ let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?")
+ .bind(chat)
+ .fetch_one(&self.db)
+ .await?;
+ Ok(chat)
+ }
+
+ pub(crate) async fn update_chat_correspondent(
+ &self,
+ old_chat: Chat,
+ new_correspondent: JID,
+ ) -> Result<Chat, Error> {
+ // TODO: update other chat data if it differs (for now there is only correspondent so doesn't matter)
+ let new_jid = &new_correspondent;
+ let old_jid = old_chat.correspondent();
+ sqlx::query!(
+ "update chats set correspondent = ? where correspondent = ?",
+ new_jid,
+ old_jid,
+ )
+ .execute(&self.db)
+ .await?;
+ let chat = self.read_chat(new_correspondent).await?;
+ Ok(chat)
+ }
+
+ // pub(crate) async fn update_chat
+
+ pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
+ sqlx::query!("delete from chats where correspondent = ?", chat)
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ /// TODO: sorting and filtering (for now there is no sorting)
+ pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
+ let chats: Vec<Chat> = sqlx::query_as("select * from chats")
+ .fetch_all(&self.db)
+ .await?;
+ Ok(chats)
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
+ let chats = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ .fetch_all(&self.db)
+ .await?;
+ Ok(chats)
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered_with_latest_messages(
+ &self,
+ ) -> Result<Vec<(Chat, Message)>, Error> {
+ #[derive(sqlx::FromRow)]
+ pub struct ChatWithMessage {
+ #[sqlx(flatten)]
+ pub chat: Chat,
+ #[sqlx(flatten)]
+ pub message: Message,
+ }
+
+ // TODO: i don't know if this will assign the right uuid to the latest message or the chat's id. should probably check but i don't think it matters as nothing ever gets called with the id of the latest message in the chats list
+ let chats: Vec<ChatWithMessage> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ .fetch_all(&self.db)
+ .await?;
+
+ let chats = chats
+ .into_iter()
+ .map(|chat_with_message| (chat_with_message.chat, chat_with_message.message))
+ .collect();
+
+ Ok(chats)
+ }
+
+ async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ id: Uuid,
+ }
+ let chat = chat.as_bare();
+ let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?")
+ .bind(chat)
+ .fetch_one(&self.db)
+ .await?;
+ let chat_id = chat_id.id;
+ Ok(chat_id)
+ }
+
+ async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ id: Uuid,
+ }
+ let chat_id: Option<Row> = sqlx::query_as("select id from chats where correspondent = ?")
+ .bind(chat)
+ .fetch_optional(&self.db)
+ .await?;
+ let chat_id = chat_id.map(|row| row.id);
+ Ok(chat_id)
+ }
+
+ /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
+ pub(crate) async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {
+ // TODO: one query
+ let bare_jid = message.from.as_bare();
+ let resource = message.from.resourcepart;
+ let chat_id = self.read_chat_id(chat).await?;
+ sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource, message.timestamp).execute(&self.db).await?;
+ Ok(())
+ }
+
+ pub(crate) async fn create_message_with_self_resource_and_chat(
+ &self,
+ message: Message,
+ chat: JID,
+ ) -> Result<(), Error> {
+ let from_jid = message.from.as_bare();
+ let resource = &message.from.resourcepart;
+ let bare_chat = chat.as_bare();
+ sqlx::query!(
+ "insert into users (jid) values (?) on conflict do nothing",
+ from_jid
+ )
+ .execute(&self.db)
+ .await?;
+ let id = Uuid::new_v4();
+ sqlx::query!(
+ "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
+ id,
+ bare_chat
+ )
+ .execute(&self.db)
+ .await?;
+ if let Some(resource) = resource {
+ sqlx::query!(
+ "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
+ from_jid,
+ resource
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ self.create_message(message, chat).await?;
+ Ok(())
+ }
+
+ // create direct message from incoming
+ pub(crate) async fn create_message_with_user_resource_and_chat(
+ &self,
+ message: Message,
+ chat: JID,
+ ) -> Result<(), Error> {
+ let bare_chat = chat.as_bare();
+ let resource = &chat.resourcepart;
+ sqlx::query!(
+ "insert into users (jid) values (?) on conflict do nothing",
+ bare_chat
+ )
+ .execute(&self.db)
+ .await?;
+ let id = Uuid::new_v4();
+ sqlx::query!(
+ "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
+ id,
+ bare_chat
+ )
+ .execute(&self.db)
+ .await?;
+ if let Some(resource) = resource {
+ sqlx::query!(
+ "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
+ bare_chat,
+ resource
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ self.create_message(message, chat).await?;
+ Ok(())
+ }
+
+ pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
+ let message: Message = sqlx::query_as("select * from messages where id = ?")
+ .bind(message)
+ .fetch_one(&self.db)
+ .await?;
+ Ok(message)
+ }
+
+ // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}
+
+ pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> {
+ sqlx::query!("delete from messages where id = ?", message)
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ // TODO: paging
+ pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
+ let chat_id = self.read_chat_id(chat).await?;
+ let messages: Vec<Message> =
+ sqlx::query_as("select * from messages where chat_id = ? order by timestamp asc")
+ .bind(chat_id)
+ .fetch_all(&self.db)
+ .await?;
+ Ok(messages)
+ }
+
+ pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
+ let online: Online = sqlx::query_as("select * from cached_status where id = 0")
+ .fetch_one(&self.db)
+ .await?;
+ Ok(online)
+ }
+
+ pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?",
+ status.show,
+ status.status,
+ status.show,
+ status.status
+ ).execute(&self.db).await?;
+ Ok(())
+ }
+
+ pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> {
+ sqlx::query!("update cached_status set show = null, message = null where id = 0")
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+}
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
new file mode 100644
index 0000000..996a503
--- /dev/null
+++ b/filamento/src/error.rs
@@ -0,0 +1,142 @@
+use std::sync::Arc;
+
+use lampada::error::{ConnectionError, ReadError, WriteError};
+use stanza::client::Stanza;
+use thiserror::Error;
+
+pub use lampada::error::CommandError;
+
+// for the client logic impl
+#[derive(Debug, Error, Clone)]
+pub enum Error {
+ #[error("core error: {0}")]
+ Connection(#[from] ConnectionError),
+ #[error("received unrecognized/unsupported content")]
+ UnrecognizedContent,
+ // TODO: include content
+ // UnrecognizedContent(peanuts::element::Content),
+ #[error("iq receive error: {0}")]
+ Iq(IqError),
+ // TODO: change to Connecting(ConnectingError)
+ #[error("connecting: {0}")]
+ Connecting(#[from] ConnectionJobError),
+ #[error("presence: {0}")]
+ Presence(#[from] PresenceError),
+ #[error("set status: {0}")]
+ SetStatus(#[from] StatusError),
+ // TODO: have different ones for get/update/set
+ #[error("roster: {0}")]
+ Roster(RosterError),
+ #[error("stream error: {0}")]
+ Stream(#[from] stanza::stream::Error),
+ #[error("message send error: {0}")]
+ MessageSend(MessageSendError),
+ #[error("message receive error: {0}")]
+ MessageRecv(MessageRecvError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum MessageSendError {
+ #[error("could not add to message history: {0}")]
+ MessageHistory(#[from] DatabaseError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum MessageRecvError {
+ #[error("could not add to message history: {0}")]
+ MessageHistory(#[from] DatabaseError),
+ #[error("missing from")]
+ MissingFrom,
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum StatusError {
+ #[error("cache: {0}")]
+ Cache(#[from] DatabaseError),
+ #[error("stream write: {0}")]
+ Write(#[from] WriteError),
+}
+
+#[derive(Debug, Clone, Error)]
+pub enum ConnectionJobError {
+ // #[error("connection failed: {0}")]
+ // ConnectionFailed(#[from] luz::Error),
+ #[error("failed roster retreival: {0}")]
+ RosterRetreival(#[from] RosterError),
+ #[error("failed to send available presence: {0}")]
+ SendPresence(#[from] WriteError),
+ #[error("cached status: {0}")]
+ StatusCacheError(#[from] DatabaseError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum RosterError {
+ #[error("cache: {0}")]
+ Cache(#[from] DatabaseError),
+ #[error("stream write: {0}")]
+ Write(#[from] WriteError),
+ // TODO: display for stanza, to show as xml, same for read error types.
+ #[error("unexpected reply: {0:?}")]
+ UnexpectedStanza(Stanza),
+ #[error("stream read: {0}")]
+ Read(#[from] ReadError),
+ #[error("stanza error: {0}")]
+ StanzaError(#[from] stanza::client::error::Error),
+}
+
+#[derive(Debug, Error, Clone)]
+#[error("database error: {0}")]
+pub struct DatabaseError(Arc<sqlx::Error>);
+
+impl From<sqlx::Error> for DatabaseError {
+ fn from(e: sqlx::Error) -> Self {
+ Self(Arc::new(e))
+ }
+}
+
+impl From<sqlx::Error> for DatabaseOpenError {
+ fn from(e: sqlx::Error) -> Self {
+ Self::Error(Arc::new(e))
+ }
+}
+
+#[derive(Debug, Error, Clone)]
+// TODO: should probably have all iq query related errors here, including read, write, stanza error, etc.
+pub enum IqError {
+ #[error("no iq with id matching `{0}`")]
+ NoMatchingId(String),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum DatabaseOpenError {
+ #[error("error: {0}")]
+ Error(Arc<sqlx::Error>),
+ #[error("migration: {0}")]
+ Migration(Arc<sqlx::migrate::MigrateError>),
+ #[error("io: {0}")]
+ Io(Arc<tokio::io::Error>),
+ #[error("invalid path")]
+ InvalidPath,
+}
+
+impl From<sqlx::migrate::MigrateError> for DatabaseOpenError {
+ fn from(e: sqlx::migrate::MigrateError) -> Self {
+ Self::Migration(Arc::new(e))
+ }
+}
+
+impl From<tokio::io::Error> for DatabaseOpenError {
+ fn from(e: tokio::io::Error) -> Self {
+ Self::Io(Arc::new(e))
+ }
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum PresenceError {
+ #[error("unsupported")]
+ Unsupported,
+ #[error("missing from")]
+ MissingFrom,
+ #[error("stanza error: {0}")]
+ StanzaError(#[from] stanza::client::error::Error),
+}
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
new file mode 100644
index 0000000..db59a67
--- /dev/null
+++ b/filamento/src/lib.rs
@@ -0,0 +1,1598 @@
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+use chat::{Body, Chat, Message};
+use chrono::Utc;
+use db::Db;
+use error::{
+ ConnectionJobError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
+ RosterError, StatusError,
+};
+use futures::FutureExt;
+use jid::JID;
+use lampada::{
+ Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage,
+ error::{ActorError, CommandError, ConnectionError, ReadError, WriteError},
+};
+use presence::{Offline, Online, Presence, PresenceType, Show};
+use roster::{Contact, ContactUpdate};
+use stanza::client::{
+ Stanza,
+ iq::{self, Iq, IqType},
+};
+use tokio::{
+ sync::{Mutex, mpsc, oneshot},
+ time::timeout,
+};
+use tracing::{debug, info};
+use user::User;
+use uuid::Uuid;
+
+pub mod chat;
+pub mod db;
+pub mod error;
+pub mod presence;
+pub mod roster;
+pub mod user;
+
+pub enum Command {
+ /// get the roster. if offline, retreive cached version from database. should be stored in application memory
+ GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
+ /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
+ // TODO: paging and filtering
+ GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
+ // TODO: paging and filtering
+ GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
+ // TODO: paging and filtering
+ GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
+ /// get a specific chat by jid
+ GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
+ /// get message history for chat (does appropriate mam things)
+ // TODO: paging and filtering
+ GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
+ /// delete a chat from your chat history, along with all the corresponding messages
+ DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
+ /// delete a message from your chat history
+ DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
+ /// get a user from your users database
+ GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
+ /// add a contact to your roster, with a status of none, no subscriptions.
+ AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
+ BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
+ SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
+ AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
+ AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// unsubscribe to a contact, but don't remove their subscription.
+ UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// stop a contact from being subscribed, but stay subscribed to the contact.
+ UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// remove subscriptions to and from contact, but keep in roster.
+ UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
+ DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ /// update contact. contact details will be overwritten with the contents of the contactupdate struct.
+ UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
+ /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
+ SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
+ /// send presence stanza
+ // TODO: cache presence stanza
+ SendPresence(
+ Option<JID>,
+ PresenceType,
+ oneshot::Sender<Result<(), WriteError>>,
+ ),
+ /// send a directed presence (usually to a non-contact).
+ // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
+ /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
+ /// chatroom). if disconnected, will be cached so when client connects, message will be sent.
+ SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
+}
+/// an xmpp client that is suited for a chat client use case
+#[derive(Debug)]
+pub struct Client {
+ sender: mpsc::Sender<CoreClientCommand<Command>>,
+ timeout: Duration,
+}
+
+impl Clone for Client {
+ fn clone(&self) -> Self {
+ Self {
+ sender: self.sender.clone(),
+ timeout: self.timeout,
+ }
+ }
+}
+
+impl Deref for Client {
+ type Target = mpsc::Sender<CoreClientCommand<Command>>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for Client {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl Client {
+ pub async fn connect(&self) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Connect).await?;
+ Ok(())
+ }
+
+ pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Disconnect).await?;
+ Ok(())
+ }
+
+ pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
+ let (command_sender, command_receiver) = mpsc::channel(20);
+ let (update_send, update_recv) = mpsc::channel(20);
+
+ // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
+ let (_sup_send, sup_recv) = oneshot::channel();
+ let sup_recv = sup_recv.fuse();
+
+ let logic = ClientLogic {
+ db,
+ pending: Arc::new(Mutex::new(HashMap::new())),
+ update_sender: update_send,
+ };
+
+ let actor: CoreClient<ClientLogic> =
+ CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
+ tokio::spawn(async move { actor.run().await });
+
+ (
+ Self {
+ sender: command_sender,
+ // TODO: configure timeout
+ timeout: Duration::from_secs(10),
+ },
+ update_recv,
+ )
+ }
+
+ pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetRoster(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let roster = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(roster)
+ }
+
+ pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChats(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChatsOrdered(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chats_ordered_with_latest_messages(
+ &self,
+ ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::GetChatsOrderedWithLatestMessages(send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChat(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chat = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chat)
+ }
+
+ pub async fn get_messages(
+ &self,
+ jid: JID,
+ ) -> Result<Vec<Message>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetMessages(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let messages = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(messages)
+ }
+
+ pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteMessage(id, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetUser(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::AddContact(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SubscriptionRequest(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn accept_subscription_request(
+ &self,
+ jid: JID,
+ ) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::AcceptSubscriptionRequest(jid, send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnsubscribeContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UnfriendContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::DeleteContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn update_contact(
+ &self,
+ jid: JID,
+ update: ContactUpdate,
+ ) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::UpdateContact(
+ jid, update, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SetStatus(online, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::SendMessage(
+ jid, body, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+}
+
+#[derive(Clone)]
+pub struct ClientLogic {
+ db: Db,
+ pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ update_sender: mpsc::Sender<UpdateMessage>,
+}
+
+impl Logic for ClientLogic {
+ type Cmd = Command;
+
+ async fn handle_connect(self, connection: Connected) {
+ let (send, recv) = oneshot::channel();
+ debug!("getting roster");
+ self.clone()
+ .handle_online(Command::GetRoster(send), connection.clone())
+ .await;
+ debug!("sent roster req");
+ let roster = recv.await;
+ debug!("got roster");
+ match roster {
+ Ok(r) => match r {
+ Ok(roster) => {
+ let online = self.db.read_cached_status().await;
+ let online = match online {
+ Ok(online) => online,
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::StatusCacheError(e.into()),
+ )))
+ .await;
+ Online::default()
+ }
+ };
+ let (send, recv) = oneshot::channel();
+ self.clone()
+ .handle_online(
+ Command::SendPresence(None, PresenceType::Online(online.clone()), send),
+ connection,
+ )
+ .await;
+ let set_status = recv.await;
+ match set_status {
+ Ok(s) => match s {
+ Ok(()) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Online(online, roster))
+ .await;
+ }
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::SendPresence(WriteError::Actor(e.into())),
+ )))
+ .await;
+ }
+ }
+ }
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor(
+ e.into(),
+ ))),
+ )))
+ .await;
+ }
+ }
+ }
+
+ async fn handle_disconnect(self, connection: Connected) {
+ // TODO: be able to set offline status message
+ let offline_presence: stanza::client::presence::Presence =
+ Offline::default().into_stanza(None);
+ let stanza = Stanza::Presence(offline_presence);
+ // TODO: timeout and error check
+ connection.write_handle().write(stanza).await;
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Offline(Offline::default()))
+ .await;
+ }
+
+ async fn handle_stanza(
+ self,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+ ) {
+ match stanza {
+ Stanza::Message(stanza_message) => {
+ if let Some(mut from) = stanza_message.from {
+ // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
+ let timestamp = stanza_message
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ // TODO: group chat messages
+ let mut message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ timestamp,
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ let result = self
+ .db
+ .create_message_with_user_resource_and_chat(message.clone(), from.clone())
+ .await;
+ if let Err(e) = result {
+ tracing::error!("messagecreate");
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MessageHistory(e.into()),
+ )))
+ .await;
+ }
+ message.from = message.from.as_bare();
+ from = from.as_bare();
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Message { to: from, message })
+ .await;
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Presence(presence) => {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
+ PresenceError::StanzaError(
+ presence
+ .errors
+ .first()
+ .cloned()
+ .expect("error MUST have error"),
+ ),
+ )))
+ .await;
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::Unsupported,
+ )))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::SubscriptionRequest(from))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Offline(offline),
+ },
+ })
+ .await;
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => {}
+ stanza::client::presence::PresenceType::Unsubscribe => {}
+ stanza::client::presence::PresenceType::Unsubscribed => {}
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Online(online),
+ },
+ })
+ .await;
+ }
+ }
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Iq(iq) => match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = self.pending.lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
+ iq.id,
+ ))))
+ .await;
+ }
+ }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => {}
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ self.db.delete_contact(item.jid.clone()).await;
+ self.update_sender
+ .send(UpdateMessage::RosterDelete(item.jid))
+ .await;
+ // TODO: send result
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) =
+ self.db.upsert_contact(contact.clone()).await
+ {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Roster(
+ RosterError::Cache(e.into()),
+ )))
+ .await;
+ }
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::RosterUpdate(contact))
+ .await;
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
+ }
+ }
+ }
+ }
+ // TODO: send unsupported to server
+ _ => {}
+ }
+ } else {
+ // TODO: send error (unsupported) to server
+ }
+ }
+ },
+ Stanza::Error(error) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Stream(error)))
+ .await;
+ // TODO: reconnect
+ }
+ Stanza::OtherContent(content) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::UnrecognizedContent));
+ // TODO: send error to write_thread
+ }
+ }
+ }
+
+ async fn handle_online(self, command: Command, connection: Connected) {
+ match command {
+ Command::GetRoster(result_sender) => {
+ // TODO: jid resource should probably be stored within the connection
+ debug!("before client_jid lock");
+ debug!("after client_jid lock");
+ let iq_id = Uuid::new_v4().to_string();
+ let (send, iq_recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ let _ = connection
+ .write_handle()
+ .send(WriteMessage {
+ stanza,
+ respond_to: send,
+ })
+ .await;
+ // TODO: timeout
+ match recv.await {
+ Ok(Ok(())) => info!("roster request sent"),
+ Ok(Err(e)) => {
+ // TODO: log errors if fail to send
+ let _ = result_sender.send(Err(RosterError::Write(e.into())));
+ return;
+ }
+ Err(e) => {
+ let _ = result_sender
+ .send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ };
+ // TODO: timeout
+ match iq_recv.await {
+ Ok(Ok(stanza)) => match stanza {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> =
+ items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await {
+ self.update_sender
+ .send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
+ e.into(),
+ ))))
+ .await;
+ };
+ result_sender.send(Ok(contacts));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ result_sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Ok(Err(e)) => {
+ result_sender.send(Err(RosterError::Read(e)));
+ return;
+ }
+ Err(e) => {
+ result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = self.db.read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = self
+ .db
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = self
+ .db
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = self.db.read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: None,
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ // TODO: write_handle send helper function
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::BuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SubscriptionRequest(jid, sender) => {
+ // TODO: i should probably have builders
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::AcceptBuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::AcceptSubscriptionRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeFromContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnfriendContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::DeleteContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: Some(stanza::roster::Subscription::Remove),
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::UpdateContact(jid, contact_update, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let groups = Vec::from_iter(
+ contact_update
+ .groups
+ .into_iter()
+ .map(|group| stanza::roster::Group(Some(group))),
+ );
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: contact_update.name,
+ subscription: None,
+ groups,
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ self.pending.lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::SetStatus(online, sender) => {
+ let result = self.db.upsert_cached_status(online.clone()).await;
+ if let Err(e) = result {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
+ e.into(),
+ ))))
+ .await;
+ }
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(online.into_stanza(None)))
+ .await
+ .map_err(|e| StatusError::Write(e));
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(jid, body, sender) => {
+ let id = Uuid::new_v4();
+ let message = Stanza::Message(stanza::client::message::Message {
+ from: Some(connection.jid().clone()),
+ id: Some(id.to_string()),
+ to: Some(jid.clone()),
+ // TODO: specify message type
+ r#type: stanza::client::message::MessageType::Chat,
+ // TODO: lang ?
+ lang: None,
+ subject: None,
+ body: Some(stanza::client::message::Body {
+ lang: None,
+ body: Some(body.body.clone()),
+ }),
+ thread: None,
+ delay: None,
+ });
+ let _ = sender.send(Ok(()));
+ // let _ = sender.send(Ok(message.clone()));
+ let result = connection.write_handle().write(message).await;
+ match result {
+ Ok(_) => {
+ let mut message = Message {
+ id,
+ from: connection.jid().clone(),
+ body,
+ timestamp: Utc::now(),
+ };
+ info!("send message {:?}", message);
+ if let Err(e) = self
+ .db
+ .create_message_with_self_resource_and_chat(
+ message.clone(),
+ jid.clone(),
+ )
+ .await
+ .map_err(|e| e.into())
+ {
+ tracing::error!("{}", e);
+ let _ =
+ self.update_sender
+ .send(UpdateMessage::Error(Error::MessageSend(
+ error::MessageSendError::MessageHistory(e),
+ )));
+ }
+ // TODO: don't do this, have separate from from details
+ message.from = message.from.as_bare();
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Message { to: jid, message })
+ .await;
+ }
+ Err(_) => {
+ // let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SendPresence(jid, presence, sender) => {
+ let mut presence: stanza::client::presence::Presence = presence.into();
+ if let Some(jid) = jid {
+ presence.to = Some(jid);
+ };
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(presence))
+ .await;
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ }
+ }
+
+ async fn handle_offline(self, command: Command) {
+ match command {
+ Command::GetRoster(sender) => {
+ let roster = self.db.read_cached_roster().await;
+ match roster {
+ Ok(roster) => {
+ let _ = sender.send(Ok(roster));
+ }
+ Err(e) => {
+ let _ = sender.send(Err(RosterError::Cache(e.into())));
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = self.db.read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = self
+ .db
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = self
+ .db
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = self.db.read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::BuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptBuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptSubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeFromContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnfriendContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::DeleteContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::UpdateContact(_jid, _contact_update, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::SetStatus(online, sender) => {
+ let result = self
+ .db
+ .upsert_cached_status(online)
+ .await
+ .map_err(|e| StatusError::Cache(e.into()));
+ sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(_jid, _body, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SendPresence(_jid, _presence, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ }
+ }
+ // pub async fn handle_stream_error(self, error) {}
+ // stanza errors (recoverable)
+ // pub async fn handle_error(self, error: Error) {}
+ // when it aborts, must clear iq map no matter what
+ async fn on_abort(self) {
+ let mut iqs = self.pending.lock().await;
+ for (_id, sender) in iqs.drain() {
+ let _ = sender.send(Err(ReadError::LostConnection));
+ }
+ }
+
+ async fn handle_connection_error(self, error: ConnectionError) {
+ self.update_sender
+ .send(UpdateMessage::Error(
+ ConnectionError::AlreadyConnected.into(),
+ ))
+ .await;
+ }
+}
+
+impl From<Command> for CoreClientCommand<Command> {
+ fn from(value: Command) -> Self {
+ CoreClientCommand::Command(value)
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum UpdateMessage {
+ Error(Error),
+ Online(Online, Vec<Contact>),
+ Offline(Offline),
+ /// received roster from jabber server (replace full app roster state with this)
+ /// is this needed?
+ FullRoster(Vec<Contact>),
+ /// (only update app roster state, don't replace)
+ RosterUpdate(Contact),
+ RosterDelete(JID),
+ /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
+ Presence {
+ from: JID,
+ presence: Presence,
+ },
+ // TODO: receipts
+ // MessageDispatched(Uuid),
+ Message {
+ to: JID,
+ message: Message,
+ },
+ SubscriptionRequest(jid::JID),
+}
diff --git a/filamento/src/presence.rs b/filamento/src/presence.rs
new file mode 100644
index 0000000..e35761c
--- /dev/null
+++ b/filamento/src/presence.rs
@@ -0,0 +1,151 @@
+use chrono::{DateTime, Utc};
+use sqlx::Sqlite;
+use stanza::{client::presence::String1024, xep_0203::Delay};
+
+#[derive(Debug, Default, sqlx::FromRow, Clone)]
+pub struct Online {
+ pub show: Option<Show>,
+ #[sqlx(rename = "message")]
+ pub status: Option<String>,
+ #[sqlx(skip)]
+ pub priority: Option<i8>,
+}
+
+#[derive(Debug, Clone, Copy)]
+pub enum Show {
+ Away,
+ Chat,
+ DoNotDisturb,
+ ExtendedAway,
+}
+
+impl sqlx::Type<Sqlite> for Show {
+ fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
+ <&str as sqlx::Type<Sqlite>>::type_info()
+ }
+}
+
+impl sqlx::Decode<'_, Sqlite> for Show {
+ fn decode(
+ value: <Sqlite as sqlx::Database>::ValueRef<'_>,
+ ) -> Result<Self, sqlx::error::BoxDynError> {
+ let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
+ match value {
+ "away" => Ok(Self::Away),
+ "chat" => Ok(Self::Chat),
+ "do-not-disturb" => Ok(Self::DoNotDisturb),
+ "extended-away" => Ok(Self::ExtendedAway),
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl sqlx::Encode<'_, Sqlite> for Show {
+ fn encode_by_ref(
+ &self,
+ buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
+ ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
+ let value = match self {
+ Show::Away => "away",
+ Show::Chat => "chat",
+ Show::DoNotDisturb => "do-not-disturb",
+ Show::ExtendedAway => "extended-away",
+ };
+ <&str as sqlx::Encode<Sqlite>>::encode(value, buf)
+ }
+}
+
+#[derive(Debug, Default, Clone)]
+pub struct Offline {
+ pub status: Option<String>,
+}
+
+#[derive(Debug, Clone)]
+pub enum PresenceType {
+ Online(Online),
+ Offline(Offline),
+}
+
+#[derive(Debug, Clone)]
+pub struct Presence {
+ pub timestamp: DateTime<Utc>,
+ pub presence: PresenceType,
+}
+
+impl Online {
+ pub fn into_stanza(
+ self,
+ timestamp: Option<DateTime<Utc>>,
+ ) -> stanza::client::presence::Presence {
+ stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: None,
+ r#type: None,
+ lang: None,
+ show: self.show.map(|show| match show {
+ Show::Away => stanza::client::presence::Show::Away,
+ Show::Chat => stanza::client::presence::Show::Chat,
+ Show::DoNotDisturb => stanza::client::presence::Show::Dnd,
+ Show::ExtendedAway => stanza::client::presence::Show::Xa,
+ }),
+ // TODO: enforce message length in status message
+ status: self.status.map(|status| stanza::client::presence::Status {
+ lang: None,
+ status: String1024(status),
+ }),
+ priority: self
+ .priority
+ .map(|priority| stanza::client::presence::Priority(priority)),
+ errors: Vec::new(),
+ delay: timestamp.map(|timestamp| Delay {
+ from: None,
+ stamp: timestamp,
+ }),
+ }
+ }
+}
+
+impl Offline {
+ pub fn into_stanza(
+ self,
+ timestamp: Option<DateTime<Utc>>,
+ ) -> stanza::client::presence::Presence {
+ stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: None,
+ r#type: Some(stanza::client::presence::PresenceType::Unavailable),
+ lang: None,
+ show: None,
+ status: self.status.map(|status| stanza::client::presence::Status {
+ lang: None,
+ status: String1024(status),
+ }),
+ priority: None,
+ errors: Vec::new(),
+ delay: timestamp.map(|timestamp| Delay {
+ from: None,
+ stamp: timestamp,
+ }),
+ }
+ }
+}
+
+impl From<PresenceType> for stanza::client::presence::Presence {
+ fn from(value: PresenceType) -> Self {
+ match value {
+ PresenceType::Online(online) => online.into_stanza(None),
+ PresenceType::Offline(offline) => offline.into_stanza(None),
+ }
+ }
+}
+
+impl From<Presence> for stanza::client::presence::Presence {
+ fn from(value: Presence) -> Self {
+ match value.presence {
+ PresenceType::Online(online) => online.into_stanza(Some(value.timestamp)),
+ PresenceType::Offline(offline) => offline.into_stanza(Some(value.timestamp)),
+ }
+ }
+}
diff --git a/filamento/src/roster.rs b/filamento/src/roster.rs
new file mode 100644
index 0000000..43c32f5
--- /dev/null
+++ b/filamento/src/roster.rs
@@ -0,0 +1,127 @@
+use std::collections::HashSet;
+
+use jid::JID;
+use sqlx::Sqlite;
+
+pub struct ContactUpdate {
+ pub name: Option<String>,
+ pub groups: HashSet<String>,
+}
+
+#[derive(Debug, sqlx::FromRow, Clone)]
+pub struct Contact {
+ // jid is the id used to reference everything, but not the primary key
+ pub user_jid: JID,
+ pub subscription: Subscription,
+ /// client user defined name
+ pub name: Option<String>,
+ // TODO: avatar, nickname
+ /// nickname picked by contact
+ // nickname: Option<String>,
+ #[sqlx(skip)]
+ pub groups: HashSet<String>,
+}
+
+#[derive(Debug, Clone)]
+pub enum Subscription {
+ None,
+ PendingOut,
+ PendingIn,
+ PendingInPendingOut,
+ OnlyOut,
+ OnlyIn,
+ OutPendingIn,
+ InPendingOut,
+ Buddy,
+ // TODO: perhaps don't need, just emit event to remove contact
+ // Remove,
+}
+
+impl sqlx::Type<Sqlite> for Subscription {
+ fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
+ <&str as sqlx::Type<Sqlite>>::type_info()
+ }
+}
+
+impl sqlx::Decode<'_, Sqlite> for Subscription {
+ fn decode(
+ value: <Sqlite as sqlx::Database>::ValueRef<'_>,
+ ) -> Result<Self, sqlx::error::BoxDynError> {
+ let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
+ match value {
+ "none" => Ok(Self::None),
+ "pending-out" => Ok(Self::PendingOut),
+ "pending-in" => Ok(Self::PendingIn),
+ "pending-in-pending-out" => Ok(Self::PendingInPendingOut),
+ "only-out" => Ok(Self::OnlyOut),
+ "only-in" => Ok(Self::OnlyIn),
+ "out-pending-in" => Ok(Self::OutPendingIn),
+ "in-pending-out" => Ok(Self::InPendingOut),
+ "buddy" => Ok(Self::Buddy),
+ _ => panic!("unexpected subscription `{value}`"),
+ }
+ }
+}
+
+impl sqlx::Encode<'_, Sqlite> for Subscription {
+ fn encode_by_ref(
+ &self,
+ buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
+ ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
+ let value = match self {
+ Subscription::None => "none",
+ Subscription::PendingOut => "pending-out",
+ Subscription::PendingIn => "pending-in",
+ Subscription::PendingInPendingOut => "pending-in-pending-out",
+ Subscription::OnlyOut => "only-out",
+ Subscription::OnlyIn => "only-in",
+ Subscription::OutPendingIn => "out-pending-in",
+ Subscription::InPendingOut => "in-pending-out",
+ Subscription::Buddy => "buddy",
+ };
+ <&str as sqlx::Encode<Sqlite>>::encode(value, buf)
+ }
+}
+
+// none
+// >
+// >>
+// <
+// <<
+// ><
+// >><
+// ><<
+// >><<
+
+impl From<stanza::roster::Item> for Contact {
+ fn from(value: stanza::roster::Item) -> Self {
+ let subscription = match value.ask {
+ true => match value.subscription {
+ Some(s) => match s {
+ stanza::roster::Subscription::Both => Subscription::Buddy,
+ stanza::roster::Subscription::From => Subscription::InPendingOut,
+ stanza::roster::Subscription::None => Subscription::PendingOut,
+ stanza::roster::Subscription::Remove => Subscription::PendingOut,
+ stanza::roster::Subscription::To => Subscription::OnlyOut,
+ },
+ None => Subscription::PendingOut,
+ },
+ false => match value.subscription {
+ Some(s) => match s {
+ stanza::roster::Subscription::Both => Subscription::Buddy,
+ stanza::roster::Subscription::From => Subscription::OnlyIn,
+ stanza::roster::Subscription::None => Subscription::None,
+ stanza::roster::Subscription::Remove => Subscription::None,
+ stanza::roster::Subscription::To => Subscription::OnlyOut,
+ },
+ None => Subscription::None,
+ },
+ };
+ Contact {
+ user_jid: value.jid,
+ subscription,
+ name: value.name,
+ groups: HashSet::from_iter(value.groups.into_iter().filter_map(|group| group.0)),
+ }
+ }
+}
diff --git a/filamento/src/user.rs b/filamento/src/user.rs
new file mode 100644
index 0000000..9914d14
--- /dev/null
+++ b/filamento/src/user.rs
@@ -0,0 +1,7 @@
+use jid::JID;
+
+#[derive(Debug, sqlx::FromRow)]
+pub struct User {
+ pub jid: JID,
+ pub cached_status_message: Option<String>,
+}