diff options
Diffstat (limited to '')
-rw-r--r-- | filamento/src/chat.rs | 64 | ||||
-rw-r--r-- | filamento/src/db.rs | 2419 | ||||
-rw-r--r-- | filamento/src/error.rs | 105 | ||||
-rw-r--r-- | filamento/src/files.rs | 25 | ||||
-rw-r--r-- | filamento/src/files/opfs.rs | 128 | ||||
-rw-r--r-- | filamento/src/lib.rs | 43 | ||||
-rw-r--r-- | filamento/src/logic/local_only.rs | 14 | ||||
-rw-r--r-- | filamento/src/logic/offline.rs | 18 | ||||
-rw-r--r-- | filamento/src/logic/online.rs | 17 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 23 | ||||
-rw-r--r-- | filamento/src/presence.rs | 30 | ||||
-rw-r--r-- | filamento/src/roster.rs | 86 | ||||
-rw-r--r-- | filamento/src/user.rs | 4 |
13 files changed, 1990 insertions, 986 deletions
diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs index 557b42b..5f58866 100644 --- a/filamento/src/chat.rs +++ b/filamento/src/chat.rs @@ -1,8 +1,16 @@ +use std::fmt::{Display, Write}; + use chrono::{DateTime, Utc}; use jid::JID; +use rusqlite::{ + ToSql, + types::{FromSql, ToSqlOutput, Value}, +}; use uuid::Uuid; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +#[cfg_attr(feature = "reactive_stores", derive(reactive_stores::Store))] pub struct Message { pub id: Uuid, // does not contain full user information @@ -16,7 +24,8 @@ pub struct Message { pub body: Body, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub enum Delivery { Sending, Written, @@ -27,19 +36,66 @@ pub enum Delivery { Queued, } +impl Display for Delivery { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Delivery::Sending => f.write_str("sending"), + Delivery::Written => f.write_str("written"), + Delivery::Sent => f.write_str("sent"), + Delivery::Delivered => f.write_str("delivered"), + Delivery::Read => f.write_str("read"), + Delivery::Failed => f.write_str("failed"), + Delivery::Queued => f.write_str("queued"), + } + } +} + +impl ToSql for Delivery { + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { + Ok(match self { + Delivery::Sending => ToSqlOutput::Owned(Value::Text("sending".to_string())), + Delivery::Written => ToSqlOutput::Owned(Value::Text("written".to_string())), + Delivery::Sent => ToSqlOutput::Owned(Value::Text("sent".to_string())), + Delivery::Delivered => ToSqlOutput::Owned(Value::Text("delivered".to_string())), + Delivery::Read => ToSqlOutput::Owned(Value::Text("read".to_string())), + Delivery::Failed => ToSqlOutput::Owned(Value::Text("failed".to_string())), + Delivery::Queued => ToSqlOutput::Owned(Value::Text("queued".to_string())), + }) + } +} + +impl FromSql for Delivery { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> { + Ok(match value.as_str()? { + "sending" => Self::Sending, + "written" => Self::Written, + "sent" => Self::Sent, + "delivered" => Self::Delivered, + "read" => Self::Read, + "failed" => Self::Failed, + "queued" => Self::Queued, + // TODO: don't have these lol + value => panic!("unexpected subscription `{value}`"), + }) + } +} + // TODO: user migrations // pub enum Migrated { // Jabber(User), // Outside, // } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub struct Body { // TODO: rich text, other contents, threads pub body: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +#[cfg_attr(feature = "reactive_stores", derive(reactive_stores::Store))] pub struct Chat { pub correspondent: JID, pub have_chatted: bool, diff --git a/filamento/src/db.rs b/filamento/src/db.rs index 467030d..1b5afe6 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -1,7 +1,15 @@ -use std::{collections::HashSet, path::Path}; +use core::fmt::Display; +use std::{collections::HashSet, ops::Deref, path::Path, sync::Arc}; use chrono::{DateTime, Utc}; use jid::JID; +use rusqlite::{Connection, OptionalExtension}; +use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::{spawn, spawn_blocking}; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio; +use tracing::debug; use uuid::Uuid; use crate::{ @@ -12,75 +20,193 @@ use crate::{ user::User, }; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Db { - // db: SqlitePool, + sender: mpsc::UnboundedSender<DbCommand>, +} + +impl Deref for Db { + type Target = mpsc::UnboundedSender<DbCommand>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +#[derive(Debug)] +pub struct DbActor { + receiver: mpsc::UnboundedReceiver<DbCommand>, + db: Connection, +} + +macro_rules! impl_db_sends { + ($($command:ident => $name:ident($($arg:ident: $arg_t:ty),*) -> $ret:ty);*) => { + $( + pub(crate) async fn $name(&self, $($arg: $arg_t),*) -> Result<$ret, Error> { + let (result, recv) = oneshot::channel(); + let command = DbCommand::$command { $($arg,)* result }; + let _ = self.sender.send(command); + let result = recv.await?; + result + } + )* + } } -// TODO: turn into trait impl Db { #[cfg(not(target_arch = "wasm32"))] pub async fn create_connect_and_migrate( - path: impl AsRef<Path>, + path: impl AsRef<Path> + Send, ) -> Result<Self, DatabaseOpenError> { - if let Some(dir) = path.as_ref().parent() { - if dir.is_dir() { - } else { - tokio::fs::create_dir_all(dir).await?; + let (sender, receiver) = mpsc::channel(20); + let (result_send, result_recv) = oneshot::channel(); + spawn_blocking(move || { + let result = DbActor::new(path, receiver).await; + match result { + Ok(a) => { + result_send.send(Ok(())); + a.run() + } + Err(e) => { + result_send.send(Err(e)); + } } - let _file = tokio::fs::OpenOptions::new() - .append(true) - .create(true) - .open(path.as_ref()) - .await?; + }); + match result_recv.await { + Ok(r) => match r { + Ok(o) => Ok(Self { sender }), + Err(e) => return Err(e), + }, + Err(e) => return Err(e.into()), + } + } + + #[cfg(not(target_arch = "wasm32"))] + pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> { + let (sender, receiver) = mpsc::channel(20); + let (result_send, result_recv) = oneshot::channel(); + spawn_blocking(move || { + let result = DbActor::new_memory(receiver).await; + match result { + Ok(a) => { + result_send.send(Ok(())); + // TODO: async run when not wasm + a.run() + } + Err(e) => { + result_send.send(Err(e)); + } + } + }); + match result_recv.await { + Ok(r) => match r { + Ok(o) => Ok(Self { sender }), + Err(e) => return Err(e), + }, + Err(e) => return Err(e.into()), } - let url = format!( - "sqlite://{}", - path.as_ref() - .to_str() - .ok_or(DatabaseOpenError::InvalidPath)? - ); - // let db = SqlitePool::connect(&url).await?; - // migrate!().run(&db).await?; - // Ok(Self { db }) - Ok(Self {}) } + /// `file_name` should be a file not in a directory #[cfg(target_arch = "wasm32")] pub async fn create_connect_and_migrate( - path: impl AsRef<Path>, + file_name: impl AsRef<str> + Send + 'static, ) -> Result<Self, DatabaseOpenError> { - // let url = "mem.db"; - // let db = SqlitePool::connect(&url).await?; - // // migrate!().run(&db).await?; - Ok(Self {}) + use tokio_with_wasm::spawn_local; + + let (sender, receiver) = mpsc::unbounded_channel(); + let (result_send, result_recv) = oneshot::channel(); + spawn_blocking(move || { + spawn_local(async move { + debug!("installing opfs in spawn"); + rusqlite::ffi::install_opfs_sahpool( + Some(&rusqlite::ffi::OpfsSAHPoolCfg::default()), + false, + ) + .await + .unwrap(); + debug!("opfs installed"); + let file_name = format!("file:{}?vfs=opfs-sahpool", file_name.as_ref()); + let result = DbActor::new(file_name, receiver); + match result { + Ok(a) => { + result_send.send(Ok(())); + a.run().await + } + Err(e) => { + result_send.send(Err(e)); + } + } + }); + }); + match result_recv.await { + Ok(r) => match r { + Ok(o) => Ok(Self { sender }), + Err(e) => return Err(e), + }, + Err(e) => return Err(e.into()), + } } - // pub(crate) fn new(db: SqlitePool) -> Self { - // // Self { db } - // Self {} - // } + #[cfg(target_arch = "wasm32")] + pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> { + let (sender, receiver) = mpsc::unbounded_channel(); + let (result_send, result_recv) = oneshot::channel(); + spawn_blocking(move || { + let result = DbActor::new_memory(receiver); + match result { + Ok(a) => { + result_send.send(Ok(())); + tokio_with_wasm::spawn_local(async { a.run().await }); + // a.run() + } + Err(e) => { + result_send.send(Err(e)); + } + } + }); + match result_recv.await { + Ok(r) => match r { + Ok(o) => Ok(Self { sender }), + Err(e) => return Err(e), + }, + Err(e) => return Err(e.into()), + } + } pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::CreateUser { user, result }; + self.sender.send(command); + let result = recv.await?; + result } + // TODO: this is not a 'read' user pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { - Ok(User { - jid: user, - nick: None, - avatar: None, - }) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadUser { user, result }; + self.sender.send(command); + let result = recv.await?; + result } /// returns whether or not the nickname was updated pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { - Ok(true) + let (result, recv) = oneshot::channel(); + let command = DbCommand::DeleteUserNick { jid, result }; + self.sender.send(command); + let result = recv.await?; + result } /// returns whether or not the nickname was updated pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { - Ok(true) + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpsertUserNick { jid, nick, result }; + self.sender.send(command); + let result = recv.await?; + result } /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar @@ -88,7 +214,11 @@ impl Db { &self, jid: JID, ) -> Result<(bool, Option<String>), Error> { - Ok((true, None)) + let (result, recv) = oneshot::channel(); + let command = DbCommand::DeleteUserAvatar { jid, result }; + self.sender.send(command); + let result = recv.await?; + result } /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar @@ -97,11 +227,24 @@ impl Db { jid: JID, avatar: String, ) -> Result<(bool, Option<String>), Error> { - Ok((true, None)) + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpsertUserAvatar { + jid, + avatar, + result, + }; + self.sender.send(command); + let result = recv.await?; + result } + // TODO: use references everywhere pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpdateUser { user, result }; + self.sender.send(command); + let result = recv.await?; + result } // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them. @@ -109,64 +252,112 @@ impl Db { /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::CreateContact { contact, result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> { - Ok(Contact { - user_jid: contact, - subscription: crate::roster::Subscription::None, - name: None, - groups: HashSet::new(), - }) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadContact { contact, result }; + self.sender.send(command); + let result = recv.await?; + result } - pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> { - Ok(None) + pub(crate) async fn read_contact_opt(&self, contact: JID) -> Result<Option<Contact>, Error> { + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadContactOpt { contact, result }; + self.sender.send(command); + let result = recv.await?; + result } /// does not update the underlying user, to update user, update_user() must be called separately pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpdateContact { contact, result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpsertContact { contact, result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::DeleteContact { contact, result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReplaceCachedRoster { roster, result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { - Ok(Vec::new()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadCachedRoster { result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn read_cached_roster_with_users( &self, ) -> Result<Vec<(Contact, User)>, Error> { - Ok(Vec::new()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadCachedRosterWithUsers { result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::CreateChat { chat, result }; + self.sender.send(command); + let result = recv.await?; + result } // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster. pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> { - Ok(Chat { - correspondent: chat, - have_chatted: false, - }) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadChat { chat, result }; + self.sender.send(command); + let result = recv.await?; + result + } + + pub(crate) async fn read_chat_and_user(&self, chat: JID) -> Result<(Chat, User), Error> { + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadChatAndUser { chat, result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::MarkChatAsChatted { chat, result }; + self.sender.send(command); + let result = recv.await?; + result } pub(crate) async fn update_chat_correspondent( @@ -174,27 +365,44 @@ impl Db { old_chat: Chat, new_correspondent: JID, ) -> Result<Chat, Error> { - Ok(Chat { - correspondent: new_correspondent, - have_chatted: false, - }) + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpdateChatCorrespondent { + old_chat, + new_correspondent, + result, + }; + self.sender.send(command); + let result = recv.await?; + result } // pub(crate) async fn update_chat pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::DeleteChat { chat, result }; + self.sender.send(command); + let result = recv.await?; + result } /// TODO: sorting and filtering (for now there is no sorting) pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> { - Ok(Vec::new()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadChats { result }; + self.sender.send(command); + let result = recv.await?; + result } /// chats ordered by date of last message // greatest-n-per-group pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> { - Ok(Vec::new()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadChatsOrdered { result }; + self.sender.send(command); + let result = recv.await?; + result } /// chats ordered by date of last message @@ -202,7 +410,11 @@ impl Db { pub(crate) async fn read_chats_ordered_with_latest_messages( &self, ) -> Result<Vec<(Chat, Message)>, Error> { - Ok(Vec::new()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadChatsOrderedWithLatestMessages { result }; + self.sender.send(command); + let result = recv.await?; + result } /// chats ordered by date of last message @@ -210,51 +422,79 @@ impl Db { pub(crate) async fn read_chats_ordered_with_latest_messages_and_users( &self, ) -> Result<Vec<((Chat, User), (Message, User))>, Error> { - Ok(Vec::new()) - } - - async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> { - Ok(Uuid::new_v4()) - } - - async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> { - Ok(None) + let (result, recv) = oneshot::channel(); + let command = DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result }; + self.sender.send(command); + let result = recv.await?; + result } /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. + #[tracing::instrument] pub(crate) async fn create_message( &self, message: Message, chat: JID, from: JID, ) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::CreateMessage { + message, + chat, + from, + result, + }; + self.sender.send(command); + let result = recv.await?; + result } - pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> { - Ok(false) + pub(crate) async fn upsert_chat_and_user(&self, chat: JID) -> Result<bool, Error> { + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpsertChatAndUser { chat, result }; + self.sender.send(command); + let result = recv.await?; + result } - /// MUST upsert chat beforehand - pub(crate) async fn create_message_with_self_resource( + /// create direct message from incoming. MUST upsert chat and user + #[tracing::instrument] + pub(crate) async fn create_message_with_user_resource( &self, message: Message, + // TODO: enforce two kinds of jid. bare and full + // must be bare jid chat: JID, // full jid from: JID, ) -> Result<(), Error> { - Ok(()) + tracing::info!("MSGDEBUG create_message_with_user_resource exists"); + let (result, recv) = oneshot::channel(); + let command = DbCommand::CreateMessageWithUserResource { + message, + chat, + from, + result, + }; + self.sender.send(command); + let result = recv.await?; + result } - /// create direct message from incoming. MUST upsert chat and user - pub(crate) async fn create_message_with_user_resource( + pub(crate) async fn update_message_delivery( &self, - message: Message, - chat: JID, - // full jid - from: JID, + message: Uuid, + delivery: Delivery, ) -> Result<(), Error> { - Ok(()) + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpdateMessageDelivery { + message, + delivery, + result, + }; + self.sender.send(command); + let result = recv.await?; + result } // pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> { @@ -269,909 +509,1208 @@ impl Db { // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {} - pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> { - Ok(()) - } + impl_db_sends!( + ReadCapabilities => read_capabilities(node: String) -> String; + DeleteCachedStatus => delete_cached_status() -> (); + UpsertCachedStatus => upsert_cached_status(status: Online) -> (); + ReadCachedStatus => read_cached_status() -> Online; + ReadMessageHistoryWithUsers => read_message_history_with_users(chat: JID) -> Vec<(Message, User)>; + // TODO: paging + ReadMessageHistory => read_message_history(chat: JID) -> Vec<Message>; + ReadMessage => read_message(message: Uuid) -> Message; + DeleteMessage => delete_message(message: Uuid) -> () + ); - // TODO: paging - pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> { - Ok(Vec::new()) + pub(crate) async fn upsert_capabilities( + &self, + node: String, + capabilities: String, + ) -> Result<(), Error> { + let (result, recv) = oneshot::channel(); + let command = DbCommand::UpsertCapabilities { + node, + capabilities, + result, + }; + self.sender.send(command); + let result = recv.await?; + result } +} - pub(crate) async fn read_message_history_with_users( - &self, +// TODO: i should really just make an actor macro +#[derive(Debug)] +pub enum DbCommand { + CreateUser { + user: User, + result: oneshot::Sender<Result<(), Error>>, + }, + ReadUser { + user: JID, + result: oneshot::Sender<Result<User, Error>>, + }, + DeleteUserNick { + jid: JID, + result: oneshot::Sender<Result<bool, Error>>, + }, + UpsertUserNick { + jid: JID, + nick: String, + result: oneshot::Sender<Result<bool, Error>>, + }, + DeleteUserAvatar { + jid: JID, + result: oneshot::Sender<Result<(bool, Option<String>), Error>>, + }, + UpsertUserAvatar { + jid: JID, + avatar: String, + result: oneshot::Sender<Result<(bool, Option<String>), Error>>, + }, + UpdateUser { + user: User, + result: oneshot::Sender<Result<(), Error>>, + }, + CreateContact { + contact: Contact, + result: oneshot::Sender<Result<(), Error>>, + }, + ReadContact { + contact: JID, + result: oneshot::Sender<Result<Contact, Error>>, + }, + ReadContactOpt { + contact: JID, + result: oneshot::Sender<Result<Option<Contact>, Error>>, + }, + UpdateContact { + contact: Contact, + result: oneshot::Sender<Result<(), Error>>, + }, + UpsertContact { + contact: Contact, + result: oneshot::Sender<Result<(), Error>>, + }, + DeleteContact { + contact: JID, + result: oneshot::Sender<Result<(), Error>>, + }, + ReplaceCachedRoster { + roster: Vec<Contact>, + result: oneshot::Sender<Result<(), Error>>, + }, + ReadCachedRoster { + result: oneshot::Sender<Result<Vec<Contact>, Error>>, + }, + ReadCachedRosterWithUsers { + result: oneshot::Sender<Result<Vec<(Contact, User)>, Error>>, + }, + CreateChat { + chat: Chat, + result: oneshot::Sender<Result<(), Error>>, + }, + ReadChat { chat: JID, - ) -> Result<Vec<(Message, User)>, Error> { - Ok(Vec::new()) + result: oneshot::Sender<Result<Chat, Error>>, + }, + ReadChatAndUser { + chat: JID, + result: oneshot::Sender<Result<(Chat, User), Error>>, + }, + MarkChatAsChatted { + chat: JID, + result: oneshot::Sender<Result<(), Error>>, + }, + UpdateChatCorrespondent { + old_chat: Chat, + new_correspondent: JID, + result: oneshot::Sender<Result<Chat, Error>>, + }, + DeleteChat { + chat: JID, + result: oneshot::Sender<Result<(), Error>>, + }, + ReadChats { + result: oneshot::Sender<Result<Vec<Chat>, Error>>, + }, + ReadChatsOrdered { + result: oneshot::Sender<Result<Vec<Chat>, Error>>, + }, + ReadChatsOrderedWithLatestMessages { + result: oneshot::Sender<Result<Vec<(Chat, Message)>, Error>>, + }, + ReadChatsOrderedWithLatestMessagesAndUsers { + result: oneshot::Sender<Result<Vec<((Chat, User), (Message, User))>, Error>>, + }, + // ReadChatID { + + // result: oneshot::Sender<Result<, Error>>, + // }, + // ReadChatIDOpt { + // chat: JID, + // result: oneshot::Sender<Result<Option<Uuid>, Error>>, + // }, + CreateMessage { + message: Message, + chat: JID, + from: JID, + result: oneshot::Sender<Result<(), Error>>, + }, + UpsertChatAndUser { + chat: JID, + result: oneshot::Sender<Result<bool, Error>>, + }, + CreateMessageWithUserResource { + message: Message, + chat: JID, + from: JID, + result: oneshot::Sender<Result<(), Error>>, + }, + UpdateMessageDelivery { + message: Uuid, + delivery: Delivery, + result: oneshot::Sender<Result<(), Error>>, + }, + DeleteMessage { + message: Uuid, + result: oneshot::Sender<Result<(), Error>>, + }, + ReadMessage { + message: Uuid, + result: oneshot::Sender<Result<Message, Error>>, + }, + ReadMessageHistory { + chat: JID, + result: oneshot::Sender<Result<Vec<Message>, Error>>, + }, + ReadMessageHistoryWithUsers { + chat: JID, + result: oneshot::Sender<Result<Vec<(Message, User)>, Error>>, + }, + ReadCachedStatus { + result: oneshot::Sender<Result<Online, Error>>, + }, + UpsertCachedStatus { + status: Online, + result: oneshot::Sender<Result<(), Error>>, + }, + DeleteCachedStatus { + result: oneshot::Sender<Result<(), Error>>, + }, + ReadCapabilities { + node: String, + result: oneshot::Sender<Result<String, Error>>, + }, + UpsertCapabilities { + node: String, + capabilities: String, + result: oneshot::Sender<Result<(), Error>>, + }, +} + +impl Display for DbCommand { + #[rustfmt::skip] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + DbCommand::CreateUser { user, result } => "CreateUser", + DbCommand::ReadUser { user, result } => "ReadUser", + DbCommand::DeleteUserNick { jid, result } => "DeleteUserNick", + DbCommand::UpsertUserNick { jid, nick, result } => "UpsertUserNick", + DbCommand::DeleteUserAvatar { jid, result } => "DeleteUserAvatar", + DbCommand::UpsertUserAvatar { jid, avatar, result } => "UpsertUserAvatar", + DbCommand::UpdateUser { user, result } => "UpdateUser", + DbCommand::CreateContact { contact, result } => "CreateContact", + DbCommand::ReadContact { contact, result } => "ReadContact", + DbCommand::ReadContactOpt { contact, result } => "ReadContactOpt", + DbCommand::UpdateContact { contact, result } => "UpdateContact", + DbCommand::UpsertContact { contact, result } => "UpsertContact", + DbCommand::DeleteContact { contact, result } => "DeleteContact", + DbCommand::ReplaceCachedRoster { roster, result } => "ReplaceCachedRoster", + DbCommand::ReadCachedRoster { result } => "ReadCachedRoster", + DbCommand::ReadCachedRosterWithUsers { result } => "ReadCachedRosterWithUsers", + DbCommand::CreateChat { chat, result } => "CreateChat", + DbCommand::ReadChat { chat, result } => "ReadChat", + DbCommand::MarkChatAsChatted { chat, result } => "MarkChatAsChatted", + DbCommand::UpdateChatCorrespondent { old_chat, new_correspondent, result } => "UpdateChatCorrespondent", + DbCommand::DeleteChat { chat, result } => "DeleteChat", + DbCommand::ReadChats { result } => "ReadChats", + DbCommand::ReadChatsOrdered { result } => "ReadChatsOrdered", + DbCommand::ReadChatsOrderedWithLatestMessages { result } => "ReadChatsOrderedWithLatestMessages", + DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result } => "ReadChatsOrderedWithLatestMessagesAndUsers", + DbCommand::CreateMessage { message, chat, from, result } => "CreateMessage", + DbCommand::UpsertChatAndUser { chat, result } => "UpsertChatAndUser", + DbCommand::CreateMessageWithUserResource { message, chat, from, result } => "CreateMessageWithUserResource", + DbCommand::UpdateMessageDelivery { message, delivery, result } => "UpdateMessageDelivery", + DbCommand::DeleteMessage { message, result } => "DeleteMessage", + DbCommand::ReadMessage { message, result } => "ReadMessage", + DbCommand::ReadMessageHistory { chat, result } => "ReadMessageHistory", + DbCommand::ReadMessageHistoryWithUsers { chat, result } => "ReadMessageHistoryWithUsers", + DbCommand::ReadCachedStatus { result } => "ReadCachedStatus", + DbCommand::UpsertCachedStatus { status, result } => "UpsertCachedStatus", + DbCommand::DeleteCachedStatus { result } => "DeleteCachedStatus", + DbCommand::ReadCapabilities { node, result } => "ReadCapabilities", + DbCommand::UpsertCapabilities { node, capabilities, result } => "UpsertCapabilities", + DbCommand::ReadChatAndUser { chat, result } => "ReadChatAndUser", + }) } +} - pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> { - Ok(Online::default()) +impl DbActor { + /// must be run in blocking spawn + #[cfg(not(target_arch = "wasm32"))] + pub(crate) fn new(path: impl AsRef<Path>, receiver: mpsc::Receiver<DbCommand>) -> Self { + if let Some(dir) = path.as_ref().parent() { + if dir.is_dir() { + } else { + tokio::fs::create_dir_all(dir).await?; + } + let _file = tokio::fs::OpenOptions::new() + .append(true) + .create(true) + .open(path.as_ref()) + .await?; + } + let url = format!( + "{}", + path.as_ref() + .to_str() + .ok_or(DatabaseOpenError::InvalidPath)? + ); + // let db = SqlitePool::connect(&url).await?; + // migrate!().run(&db).await?; + // Ok(Self { db }) + let db = Connection::open(url)?; + db.execute_batch(include_str!("../migrations/1.sql"))?; + Ok(Self { db, receiver }) } - pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { - Ok(()) + /// must be run in blocking spawn + #[cfg(not(target_arch = "wasm32"))] + pub(crate) fn new_memory(receiver: mpsc::Receiver<DbCommand>) -> Self { + let db = Connection::open_in_memory()?; + db.execute_batch(include_str!("../migrations/1.sql"))?; + Ok(Self { db, receiver }) } - pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> { - Ok(()) + /// must be run in blocking spawn + #[cfg(target_arch = "wasm32")] + pub fn new_memory( + receiver: mpsc::UnboundedReceiver<DbCommand>, + ) -> Result<Self, DatabaseOpenError> { + let db = Connection::open("mem.db")?; + db.execute_batch(include_str!("../migrations/1.sql"))?; + Ok(Self { db, receiver }) } - pub(crate) async fn read_capabilities(&self, node: &str) -> Result<String, Error> { - Ok("aHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvY2Fwcx9odHRwOi8vamFiYmVyLm9yZy9wcm90b2NvbC9kaXNjbyNpbmZvH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL2Rpc2NvI2l0ZW1zH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL25pY2sfaHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvbmljaytub3RpZnkfHGNsaWVudB9wYx8fZmlsYW1lbnRvIDAuMS4wHx4cHA==".to_string()) + /// must be run in blocking spawn + #[cfg(target_arch = "wasm32")] + pub fn new( + file_name: impl AsRef<Path>, + receiver: mpsc::UnboundedReceiver<DbCommand>, + ) -> Result<Self, DatabaseOpenError> { + let db = Connection::open(file_name)?; + db.execute_batch(include_str!("../migrations/1.sql"))?; + Ok(Self { db, receiver }) } - pub(crate) async fn upsert_capabilities( - &self, - node: &str, - capabilities: &str, - ) -> Result<(), Error> { + pub(crate) async fn run(mut self) { + while let Some(cmd) = self.receiver.recv().await { + let cmd_name = cmd.to_string(); + tracing::warn!("command recv: {cmd_name}"); + match cmd { + DbCommand::CreateUser { user, result } => { + result.send(self.create_user(user)); + } + DbCommand::ReadUser { user, result } => { + result.send(self.read_user(user)); + } + DbCommand::DeleteUserNick { jid, result } => { + result.send(self.delete_user_nick(jid)); + } + DbCommand::UpsertUserNick { jid, nick, result } => { + result.send(self.upsert_user_nick(jid, nick)); + } + DbCommand::DeleteUserAvatar { jid, result } => { + result.send(self.delete_user_avatar(jid)); + } + DbCommand::UpsertUserAvatar { + jid, + avatar, + result, + } => { + result.send(self.upsert_user_avatar(jid, avatar)); + } + DbCommand::UpdateUser { user, result } => { + result.send(self.update_user(user)); + } + DbCommand::CreateContact { contact, result } => { + result.send(self.create_contact(contact)); + } + DbCommand::ReadContact { contact, result } => { + result.send(self.read_contact(contact)); + } + DbCommand::ReadContactOpt { contact, result } => { + result.send(self.read_contact_opt(&contact)); + } + DbCommand::UpdateContact { contact, result } => { + result.send(self.update_contact(contact)); + } + DbCommand::UpsertContact { contact, result } => { + result.send(self.upsert_contact(contact)); + } + DbCommand::DeleteContact { contact, result } => { + result.send(self.delete_contact(contact)); + } + DbCommand::ReplaceCachedRoster { roster, result } => { + result.send(self.replace_cached_roster(roster)); + } + DbCommand::ReadCachedRoster { result } => { + result.send(self.read_cached_roster()); + } + DbCommand::ReadCachedRosterWithUsers { result } => { + result.send(self.read_cached_roster_with_users()); + } + DbCommand::CreateChat { chat, result } => { + result.send(self.create_chat(chat)); + } + DbCommand::ReadChat { chat, result } => { + result.send(self.read_chat(chat)); + } + DbCommand::ReadChatAndUser { chat, result } => { + result.send(self.read_chat_and_user(chat)); + } + DbCommand::MarkChatAsChatted { chat, result } => { + result.send(self.mark_chat_as_chatted(chat)); + } + DbCommand::UpdateChatCorrespondent { + old_chat, + new_correspondent, + result, + } => { + result.send(self.update_chat_correspondent(old_chat, new_correspondent)); + } + DbCommand::DeleteChat { chat, result } => { + result.send(self.delete_chat(chat)); + } + DbCommand::ReadChats { result } => { + result.send(self.read_chats()); + } + DbCommand::ReadChatsOrdered { result } => { + result.send(self.read_chats_ordered()); + } + DbCommand::ReadChatsOrderedWithLatestMessages { result } => { + result.send(self.read_chats_ordered_with_latest_messages()); + } + DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result } => { + result.send(self.read_chats_ordered_with_latest_messages_and_users()); + } + DbCommand::CreateMessage { + message, + chat, + from, + result, + } => { + result.send(self.create_message(message, chat, from)); + } + DbCommand::UpsertChatAndUser { chat, result } => { + result.send(self.upsert_chat_and_user(&chat)); + } + DbCommand::CreateMessageWithUserResource { + message, + chat, + from, + result, + } => { + result.send(self.create_message_with_user_resource(message, chat, from)); + } + DbCommand::UpdateMessageDelivery { + message, + delivery, + result, + } => { + result.send(self.update_message_delivery(message, delivery)); + } + DbCommand::DeleteMessage { message, result } => { + result.send(self.delete_message(message)); + } + DbCommand::ReadMessage { message, result } => { + result.send(self.read_message(message)); + } + DbCommand::ReadMessageHistory { chat, result } => { + result.send(self.read_message_history(chat)); + } + DbCommand::ReadMessageHistoryWithUsers { chat, result } => { + result.send(self.read_message_history_with_users(chat)); + } + DbCommand::ReadCachedStatus { result } => { + result.send(self.read_cached_status()); + } + DbCommand::UpsertCachedStatus { status, result } => { + result.send(self.upsert_cached_status(status)); + } + DbCommand::DeleteCachedStatus { result } => { + result.send(self.delete_cached_status()); + } + DbCommand::ReadCapabilities { node, result } => { + result.send(self.read_capabilities(node)); + } + DbCommand::UpsertCapabilities { + node, + capabilities, + result, + } => { + result.send(self.upsert_capabilities(node, capabilities)); + } + } + tracing::warn!("command finished: {cmd_name}"); + } + tracing::error!("command: db actor exited"); + } + + pub(crate) fn create_user(&self, user: User) -> Result<(), Error> { + { + self.db.execute( + "insert into users ( jid, nick, avatar ) values ( ?1, ?2, ?3 )", + (user.jid, user.nick, user.avatar), + )?; + } Ok(()) } - // pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> { - // sqlx::query!( - // "insert into users ( jid, nick ) values ( ?, ? )", - // user.jid, - // user.nick, - // ) - // .execute(&self.db) - // .await?; - // Ok(()) - // } + // TODO: this is not a 'read' user + pub(crate) fn read_user(&self, user: JID) -> Result<User, Error> { + let db = &self.db; + let user_opt = db + .query_row( + "select jid, nick, avatar from users where jid = ?1", + [&user], + |row| { + Ok(User { + jid: row.get(0)?, + nick: row.get(1)?, + avatar: row.get(2)?, + }) + }, + ) + .optional()?; + match user_opt { + Some(user) => Ok(user), + None => { + db.execute("insert into users ( jid ) values ( ?1 )", [&user])?; + Ok(User { + jid: user, + nick: None, + avatar: None, + }) + } + } + } - // pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { - // sqlx::query!( - // "insert into users ( jid ) values ( ? ) on conflict do nothing", - // user - // ) - // .execute(&self.db) - // .await?; - // let user: User = sqlx::query_as("select * from users where jid = ?") - // .bind(user) - // .fetch_one(&self.db) - // .await?; - // Ok(user) - // } + /// returns whether or not the nickname was updated + pub(crate) fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { + let rows_affected; + { + rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, None::<String>, None::<String>, None::<String>))?; + } + if rows_affected > 0 { + Ok(true) + } else { + Ok(false) + } + } - // /// returns whether or not the nickname was updated - // pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { - // if sqlx::query!( - // "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?", - // jid, - // None::<String>, - // None::<String>, - // None::<String>, - // ) - // .execute(&self.db) - // .await? - // .rows_affected() - // > 0 - // { - // Ok(true) - // } else { - // Ok(false) - // } - // } + /// returns whether or not the nickname was updated + pub(crate) fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { + let rows_affected; + { + rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, &nick, &nick, &nick))?; + } + if rows_affected > 0 { + Ok(true) + } else { + Ok(false) + } + } - // /// returns whether or not the nickname was updated - // pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { - // let rows_affected = sqlx::query!( - // "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?", - // jid, - // nick, - // nick, - // nick - // ) - // .execute(&self.db) - // .await? - // .rows_affected(); - // tracing::debug!("rows affected: {}", rows_affected); - // if rows_affected > 0 { - // Ok(true) - // } else { - // Ok(false) - // } - // } + /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar + pub(crate) fn delete_user_avatar(&self, jid: JID) -> Result<(bool, Option<String>), Error> { + let (old_avatar, rows_affected): (Option<String>, _); + { + let db = &self.db; + old_avatar = db + .query_row("select avatar from users where jid = ?1", [&jid], |row| { + Ok(row.get(0)?) + }) + .optional()?; + rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, None::<String>, None::<String>, None::<String>))?; + } + if rows_affected > 0 { + Ok((true, old_avatar)) + } else { + Ok((false, old_avatar)) + } + } - // /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar - // pub(crate) async fn delete_user_avatar( - // &self, - // jid: JID, - // ) -> Result<(bool, Option<String>), Error> { - // #[derive(sqlx::FromRow)] - // struct AvatarRow { - // avatar: Option<String>, - // } - // let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") - // .bind(jid.clone()) - // .fetch_optional(&self.db) - // .await? - // .map(|row: AvatarRow| row.avatar) - // .unwrap_or(None); - // if sqlx::query!( - // "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?", - // jid, - // None::<String>, - // None::<String>, - // None::<String>, - // ) - // .execute(&self.db) - // .await? - // .rows_affected() - // > 0 - // { - // Ok((true, old_avatar)) - // } else { - // Ok((false, old_avatar)) - // } - // } + /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar + pub(crate) fn upsert_user_avatar( + &self, + jid: JID, + avatar: String, + ) -> Result<(bool, Option<String>), Error> { + let (old_avatar, rows_affected): (Option<String>, _); + { + let db = &self.db; + old_avatar = db + .query_row("select avatar from users where jid = ?1", [&jid], |row| { + let avatar: Option<String> = row.get(0)?; + Ok(avatar) + }) + .optional()? + .unwrap_or_default(); + rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, &avatar, &avatar, &avatar))?; + } + if rows_affected > 0 { + Ok((true, old_avatar)) + } else { + Ok((false, old_avatar)) + } + } - // /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar - // pub(crate) async fn upsert_user_avatar( - // &self, - // jid: JID, - // avatar: String, - // ) -> Result<(bool, Option<String>), Error> { - // #[derive(sqlx::FromRow)] - // struct AvatarRow { - // avatar: Option<String>, - // } - // let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") - // .bind(jid.clone()) - // .fetch_optional(&self.db) - // .await? - // .map(|row: AvatarRow| row.avatar) - // .unwrap_or(None); - // if sqlx::query!( - // "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?", - // jid, - // avatar, - // avatar, - // avatar, - // ) - // .execute(&self.db) - // .await? - // .rows_affected() - // > 0 - // { - // Ok((true, old_avatar)) - // } else { - // Ok((false, old_avatar)) - // } - // } + // TODO: use references everywhere + pub(crate) fn update_user(&self, user: User) -> Result<(), Error> { + self.db.execute( + "update users set nick = ?1, avatar = ?2 where user_jid = ?1", + (&user.nick, &user.avatar, &user.jid), + )?; + Ok(()) + } - // pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { - // sqlx::query!( - // "update users set nick = ? where jid = ?", - // user.nick, - // user.jid - // ) - // .execute(&self.db) - // .await?; - // Ok(()) - // } + // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them. + // pub(crate) fn delete_user(&self, user: JID) -> Result<(), Error> {} - // // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them. - // // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {} - - // /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately - // pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> { - // sqlx::query!( - // "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )", - // contact.user_jid, - // contact.name, - // contact.subscription - // ) - // .execute(&self.db) - // .await?; - // // TODO: abstract this out in to add_to_group() function ? - // for group in contact.groups { - // sqlx::query!( - // "insert into groups (group_name) values (?) on conflict do nothing", - // group - // ) - // .execute(&self.db) - // .await?; - // sqlx::query!( - // "insert into groups_roster (group_name, contact_jid) values (?, ?)", - // group, - // contact.user_jid - // ) - // .execute(&self.db) - // .await?; - // } - // Ok(()) - // } + /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately + pub(crate) fn create_contact(&self, contact: Contact) -> Result<(), Error> { + let db = &self.db; + db.execute( + "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 )", + (&contact.user_jid, &contact.name, contact.subscription), + )?; + for group in contact.groups { + db.execute( + "insert into groups (group_name) values (?1) on conflict do nothing", + [&group], + )?; + db.execute( + "insert into groups_roster (group_name, contact_jid) values (?1, ?2)", + (group, &contact.user_jid), + )?; + } + Ok(()) + } - // pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> { - // let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?") - // .bind(contact) - // .fetch_one(&self.db) - // .await?; - // #[derive(sqlx::FromRow)] - // struct Row { - // group_name: String, - // } - // let groups: Vec<Row> = - // sqlx::query_as("select group_name from groups_roster where contact_jid = ?") - // .bind(&contact.user_jid) - // .fetch_all(&self.db) - // .await?; - // contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); - // Ok(contact) - // } + pub(crate) fn read_contact(&self, contact: JID) -> Result<Contact, Error> { + let db = &self.db; + let mut contact_item = db.query_row( + "select user_jid, name, subscription from roster where user_jid = ?1", + [&contact], + |row| { + Ok(Contact { + user_jid: row.get(0)?, + name: row.get(1)?, + subscription: row.get(2)?, + groups: HashSet::new(), + }) + }, + )?; + let groups: Result<HashSet<String>, _> = db + .prepare("select group_name from groups_roster where contact_jid = ?1")? + .query_map([&contact], |row| Ok(row.get(0)?))? + .collect(); + contact_item.groups = groups?; + Ok(contact_item) + } - // pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> { - // let contact: Option<Contact> = - // sqlx::query_as("select * from roster join users on jid = user_jid where jid = ?") - // .bind(contact) - // .fetch_optional(&self.db) - // .await?; - // if let Some(mut contact) = contact { - // #[derive(sqlx::FromRow)] - // struct Row { - // group_name: String, - // } - // let groups: Vec<Row> = - // sqlx::query_as("select group_name from groups_roster where contact_jid = ?") - // .bind(&contact.user_jid) - // .fetch_all(&self.db) - // .await?; - // contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); - // Ok(Some(contact)) - // } else { - // Ok(None) - // } - // } + pub(crate) fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> { + let db = &self.db; + let contact_item = db + .query_row( + "select user_jid, name, subscription from roster where user_jid = ?1", + [&contact], + |row| { + Ok(Contact { + user_jid: row.get(0)?, + name: row.get(1)?, + subscription: row.get(2)?, + groups: HashSet::new(), + }) + }, + ) + .optional()?; + if let Some(mut contact_item) = contact_item { + let groups: Result<HashSet<String>, _> = db + .prepare("select group_name from groups_roster where contact_jid = ?1")? + .query_map([&contact], |row| Ok(row.get(0)?))? + .collect(); + contact_item.groups = groups?; + Ok(Some(contact_item)) + } else { + Ok(None) + } + } - // /// does not update the underlying user, to update user, update_user() must be called separately - // pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> { - // sqlx::query!( - // "update roster set name = ?, subscription = ? where user_jid = ?", - // contact.name, - // contact.subscription, - // contact.user_jid - // ) - // .execute(&self.db) - // .await?; - // sqlx::query!( - // "delete from groups_roster where contact_jid = ?", - // contact.user_jid - // ) - // .execute(&self.db) - // .await?; - // // TODO: delete orphaned groups from groups table - // for group in contact.groups { - // sqlx::query!( - // "insert into groups (group_name) values (?) on conflict do nothing", - // group - // ) - // .execute(&self.db) - // .await?; - // sqlx::query!( - // "insert into groups_roster (group_name, contact_jid) values (?, ?)", - // group, - // contact.user_jid - // ) - // .execute(&self.db) - // .await?; - // } - // Ok(()) - // } + /// does not update the underlying user, to update user, update_user() must be called separately + pub(crate) fn update_contact(&self, contact: Contact) -> Result<(), Error> { + let db = &self.db; + db.execute( + "update roster set name = ?1, subscription = ?2 where user_jid = ?3", + (&contact.name, &contact.subscription, &contact.user_jid), + )?; + db.execute( + "delete from groups_roster where contact_jid = ?1", + [&contact.user_jid], + )?; + for group in contact.groups { + db.execute( + "insert into groups (group_name) values (?1) on conflict do nothing", + [&group], + )?; + db.execute( + "insert into groups_roster (group_name, contact_jid), values (?1, ?2)", + (&group, &contact.user_jid), + )?; + } + // TODO: delete orphaned groups from groups table, users etc. + Ok(()) + } - // pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { - // sqlx::query!( - // "insert into users ( jid ) values ( ? ) on conflict do nothing", - // contact.user_jid, - // ) - // .execute(&self.db) - // .await?; - // sqlx::query!( - // "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?", - // contact.user_jid, - // contact.name, - // contact.subscription, - // contact.name, - // contact.subscription - // ) - // .execute(&self.db) - // .await?; - // sqlx::query!( - // "delete from groups_roster where contact_jid = ?", - // contact.user_jid - // ) - // .execute(&self.db) - // .await?; - // // TODO: delete orphaned groups from groups table - // for group in contact.groups { - // sqlx::query!( - // "insert into groups (group_name) values (?) on conflict do nothing", - // group - // ) - // .execute(&self.db) - // .await?; - // sqlx::query!( - // "insert into groups_roster (group_name, contact_jid) values (?, ?)", - // group, - // contact.user_jid - // ) - // .execute(&self.db) - // .await?; - // } - // Ok(()) - // } + pub(crate) fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { + let db = &self.db; + db.execute( + "insert into users (jid) values (?1) on conflict do nothing", + [&contact.user_jid], + )?; + db.execute( + "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 ) on conflict do update set name = ?4, subscription = ?5", + (&contact.user_jid, &contact.name, &contact.subscription, &contact.name, &contact.subscription), + )?; + db.execute( + "delete from groups_roster where contact_jid = ?1", + [&contact.user_jid], + )?; + for group in contact.groups { + db.execute( + "insert into groups (group_name) values (?1) on conflict do nothing", + [&group], + )?; + db.execute( + "insert into groups_roster (group_name, contact_jid) values (?1, ?2)", + (group, &contact.user_jid), + )?; + } + Ok(()) + } - // pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> { - // sqlx::query!("delete from roster where user_jid = ?", contact) - // .execute(&self.db) - // .await?; - // // TODO: delete orphaned groups from groups table - // Ok(()) - // } + pub(crate) fn delete_contact(&self, contact: JID) -> Result<(), Error> { + self.db + .execute("delete from roster where user_jid = ?1", [&contact])?; + Ok(()) + } - // pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> { - // sqlx::query!("delete from roster").execute(&self.db).await?; - // for contact in roster { - // self.upsert_contact(contact).await?; - // } - // Ok(()) - // } + pub(crate) fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> { + { + self.db.execute("delete from roster", [])?; + } + for contact in roster { + self.upsert_contact(contact)?; + } + Ok(()) + } - // pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { - // let mut roster: Vec<Contact> = sqlx::query_as("select * from roster") - // .fetch_all(&self.db) - // .await?; - // for contact in &mut roster { - // #[derive(sqlx::FromRow)] - // struct Row { - // group_name: String, - // } - // let groups: Vec<Row> = - // sqlx::query_as("select group_name from groups_roster where contact_jid = ?") - // .bind(&contact.user_jid) - // .fetch_all(&self.db) - // .await?; - // contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); - // } - // Ok(roster) - // } + pub(crate) fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { + let db = &self.db; + let mut roster: Vec<_> = db + .prepare("select user_jid, name, subscription from roster")? + .query_map([], |row| { + Ok(Contact { + user_jid: row.get(0)?, + name: row.get(1)?, + subscription: row.get(2)?, + groups: HashSet::new(), + }) + })? + .collect::<Result<Vec<_>, _>>()?; + for contact in &mut roster { + let groups: Result<HashSet<String>, _> = db + .prepare("select group_name from groups_roster where contact_jid = ?1")? + .query_map([&contact.user_jid], |row| Ok(row.get(0)?))? + .collect(); + contact.groups = groups?; + } + Ok(roster) + } - // pub(crate) async fn read_cached_roster_with_users( - // &self, - // ) -> Result<Vec<(Contact, User)>, Error> { - // #[derive(sqlx::FromRow)] - // struct Row { - // #[sqlx(flatten)] - // contact: Contact, - // #[sqlx(flatten)] - // user: User, - // } - // let mut roster: Vec<Row> = - // sqlx::query_as("select * from roster join users on jid = user_jid") - // .fetch_all(&self.db) - // .await?; - // for row in &mut roster { - // #[derive(sqlx::FromRow)] - // struct Row { - // group_name: String, - // } - // let groups: Vec<Row> = - // sqlx::query_as("select group_name from groups_roster where contact_jid = ?") - // .bind(&row.contact.user_jid) - // .fetch_all(&self.db) - // .await?; - // row.contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); - // } - // let roster = roster - // .into_iter() - // .map(|row| (row.contact, row.user)) - // .collect(); - // Ok(roster) - // } + pub(crate) fn read_cached_roster_with_users(&self) -> Result<Vec<(Contact, User)>, Error> { + let db = &self.db; + let mut roster: Vec<(Contact, User)> = db.prepare("select user_jid, name, subscription, jid, nick, avatar from roster join users on jid = user_jid")?.query_map([], |row| { + Ok(( + Contact { + user_jid: row.get(0)?, + name: row.get(1)?, + subscription: row.get(2)?, + groups: HashSet::new(), + }, + User { + jid: row.get(3)?, + nick: row.get(4)?, + avatar: row.get(5)?, + } + )) + })?.collect::<Result<Vec<_>, _>>()?; + for (contact, _) in &mut roster { + let groups: Result<HashSet<String>, _> = db + .prepare("select group_name from groups_roster where contact_jid = ?1")? + .query_map([&contact.user_jid], |row| Ok(row.get(0)?))? + .collect(); + contact.groups = groups?; + } + Ok(roster) + } - // pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> { - // let id = Uuid::new_v4(); - // let jid = chat.correspondent(); - // sqlx::query!( - // "insert into chats (id, correspondent, have_chatted) values (?, ?, ?)", - // id, - // jid, - // chat.have_chatted, - // ) - // .execute(&self.db) - // .await?; - // Ok(()) - // } + pub(crate) fn create_chat(&self, chat: Chat) -> Result<(), Error> { + let id = Uuid::new_v4(); + let jid = chat.correspondent(); + self.db.execute( + "insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3)", + (id, jid, chat.have_chatted), + )?; + Ok(()) + } - // // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster. + // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster. - // pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> { - // // check if the chat correponding with the jid exists - // let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?") - // .bind(chat) - // .fetch_one(&self.db) - // .await?; - // Ok(chat) - // } + /// should be a bare jid + /// TODO: this is NOT a read + pub(crate) fn read_chat(&self, chat: JID) -> Result<Chat, Error> { + let chat_opt = self.db.query_row( + "select correspondent, have_chatted from chats where correspondent = ?1", + [&chat], + |row| { + Ok(Chat { + correspondent: row.get(0)?, + have_chatted: row.get(1)?, + }) + }, + ).optional()?; + match chat_opt { + Some(chat) => return Ok(chat), + None => { + let chat = Chat { + correspondent: chat, + have_chatted: false, + }; + self.create_chat(chat.clone())?; + Ok(chat) + } + } + } - // pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { - // let jid = chat.as_bare(); - // sqlx::query!( - // "update chats set have_chatted = true where correspondent = ?", - // jid - // ) - // .execute(&self.db) - // .await?; - // Ok(()) - // } + pub(crate) fn read_chat_and_user(&self, chat: JID) -> Result<(Chat, User), Error> { + let user = self.read_user(chat.clone())?; + let chat_opt = self.db.query_row( + "select correspondent, have_chatted, jid, nick, avatar from chats join users on correspondent = jid where correspondent = ?1", + [&chat], + |row| { + Ok(( + Chat { + correspondent: row.get(0)?, + have_chatted: row.get(1)?, + }, + User { + jid: row.get(2)?, + nick: row.get(3)?, + avatar: row.get(4)?, + } + )) + }, + ).optional()?; + match chat_opt { + Some(chat) => return Ok(chat), + None => { + let chat = Chat { + correspondent: chat, + have_chatted: false, + }; + self.create_chat(chat.clone())?; + Ok((chat, user)) + } + } + } - // pub(crate) async fn update_chat_correspondent( - // &self, - // old_chat: Chat, - // new_correspondent: JID, - // ) -> Result<Chat, Error> { - // // TODO: update other chat data if it differs (for now there is only correspondent so doesn't matter) - // let new_jid = &new_correspondent; - // let old_jid = old_chat.correspondent(); - // sqlx::query!( - // "update chats set correspondent = ? where correspondent = ?", - // new_jid, - // old_jid, - // ) - // .execute(&self.db) - // .await?; - // let chat = self.read_chat(new_correspondent).await?; - // Ok(chat) - // } + pub(crate) fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { + self.db.execute( + "update chats set have_chatted = true where correspondent = ?1", + [chat], + )?; + Ok(()) + } - // // pub(crate) async fn update_chat + pub(crate) fn update_chat_correspondent( + &self, + old_chat: Chat, + new_correspondent: JID, + ) -> Result<Chat, Error> { + let new_jid = &new_correspondent; + let old_jid = old_chat.correspondent(); + let chat = self.db.query_row( + "update chats set correspondent = ?1 where correspondent = ?2 returning correspondent, have_chatted", + [new_jid, old_jid], + |row| Ok(Chat { + correspondent: row.get(0)?, + have_chatted: row.get(1)?, + }) + )?; + Ok(chat) + } - // pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> { - // sqlx::query!("delete from chats where correspondent = ?", chat) - // .execute(&self.db) - // .await?; - // Ok(()) - // } + // pub(crate) fn update_chat - // /// TODO: sorting and filtering (for now there is no sorting) - // pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> { - // let chats: Vec<Chat> = sqlx::query_as("select * from chats") - // .fetch_all(&self.db) - // .await?; - // Ok(chats) - // } + pub(crate) fn delete_chat(&self, chat: JID) -> Result<(), Error> { + self.db + .execute("delete from chats where correspondent = ?1", [chat])?; + Ok(()) + } - // /// chats ordered by date of last message - // // greatest-n-per-group - // pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> { - // let chats = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc") - // .fetch_all(&self.db) - // .await?; - // Ok(chats) - // } + /// TODO: sorting and filtering (for now there is no sorting) + pub(crate) fn read_chats(&self) -> Result<Vec<Chat>, Error> { + let chats = self + .db + .prepare("select correspondent, have_chatted from chats")? + .query_map([], |row| { + Ok(Chat { + correspondent: row.get(0)?, + have_chatted: row.get(1)?, + }) + })? + .collect::<Result<Vec<_>, _>>()?; + Ok(chats) + } - // /// chats ordered by date of last message - // // greatest-n-per-group - // pub(crate) async fn read_chats_ordered_with_latest_messages( - // &self, - // ) -> Result<Vec<(Chat, Message)>, Error> { - // #[derive(sqlx::FromRow)] - // pub struct RowChat { - // chat_correspondent: JID, - // chat_have_chatted: bool, - // } - // impl From<RowChat> for Chat { - // fn from(value: RowChat) -> Self { - // Self { - // correspondent: value.chat_correspondent, - // have_chatted: value.chat_have_chatted, - // } - // } - // } - // #[derive(sqlx::FromRow)] - // pub struct RowMessage { - // message_id: Uuid, - // message_body: String, - // message_delivery: Option<Delivery>, - // message_timestamp: DateTime<Utc>, - // message_from_jid: JID, - // } - // impl From<RowMessage> for Message { - // fn from(value: RowMessage) -> Self { - // Self { - // id: value.message_id, - // from: value.message_from_jid, - // delivery: value.message_delivery, - // timestamp: value.message_timestamp, - // body: Body { - // body: value.message_body, - // }, - // } - // } - // } - - // #[derive(sqlx::FromRow)] - // pub struct ChatWithMessageRow { - // #[sqlx(flatten)] - // pub chat: RowChat, - // #[sqlx(flatten)] - // pub message: RowMessage, - // } - - // pub struct ChatWithMessage { - // chat: Chat, - // message: Message, - // } - - // impl From<ChatWithMessageRow> for ChatWithMessage { - // fn from(value: ChatWithMessageRow) -> Self { - // Self { - // chat: value.chat.into(), - // message: value.message.into(), - // } - // } - // } - - // // TODO: sometimes chats have no messages. - // let chats: Vec<ChatWithMessageRow> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc") - // .fetch_all(&self.db) - // .await?; - - // let chats = chats - // .into_iter() - // .map(|chat_with_message_row| { - // let chat_with_message: ChatWithMessage = chat_with_message_row.into(); - // (chat_with_message.chat, chat_with_message.message) - // }) - // .collect(); - - // Ok(chats) - // } + /// chats ordered by date of last message + // greatest-n-per-group + pub(crate) fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> { + let chats = self + .db + .prepare("select c.correspondent, c.have_chatted, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")? + .query_map([], |row| { + Ok(Chat { + correspondent: row.get(0)?, + have_chatted: row.get(1)?, + }) + })? + .collect::<Result<Vec<_>, _>>()?; + Ok(chats) + } - // /// chats ordered by date of last message - // // greatest-n-per-group - // pub(crate) async fn read_chats_ordered_with_latest_messages_and_users( - // &self, - // ) -> Result<Vec<((Chat, User), (Message, User))>, Error> { - // #[derive(sqlx::FromRow)] - // pub struct RowChat { - // chat_correspondent: JID, - // chat_have_chatted: bool, - // } - // impl From<RowChat> for Chat { - // fn from(value: RowChat) -> Self { - // Self { - // correspondent: value.chat_correspondent, - // have_chatted: value.chat_have_chatted, - // } - // } - // } - // #[derive(sqlx::FromRow)] - // pub struct RowMessage { - // message_id: Uuid, - // message_body: String, - // message_delivery: Option<Delivery>, - // message_timestamp: DateTime<Utc>, - // message_from_jid: JID, - // } - // impl From<RowMessage> for Message { - // fn from(value: RowMessage) -> Self { - // Self { - // id: value.message_id, - // from: value.message_from_jid, - // delivery: value.message_delivery, - // timestamp: value.message_timestamp, - // body: Body { - // body: value.message_body, - // }, - // } - // } - // } - // #[derive(sqlx::FromRow)] - // pub struct RowChatUser { - // chat_user_jid: JID, - // chat_user_nick: Option<String>, - // chat_user_avatar: Option<String>, - // } - // impl From<RowChatUser> for User { - // fn from(value: RowChatUser) -> Self { - // Self { - // jid: value.chat_user_jid, - // nick: value.chat_user_nick, - // avatar: value.chat_user_avatar, - // } - // } - // } - // #[derive(sqlx::FromRow)] - // pub struct RowMessageUser { - // message_user_jid: JID, - // message_user_nick: Option<String>, - // message_user_avatar: Option<String>, - // } - // impl From<RowMessageUser> for User { - // fn from(value: RowMessageUser) -> Self { - // Self { - // jid: value.message_user_jid, - // nick: value.message_user_nick, - // avatar: value.message_user_avatar, - // } - // } - // } - // #[derive(sqlx::FromRow)] - // pub struct ChatWithMessageAndUsersRow { - // #[sqlx(flatten)] - // pub chat: RowChat, - // #[sqlx(flatten)] - // pub chat_user: RowChatUser, - // #[sqlx(flatten)] - // pub message: RowMessage, - // #[sqlx(flatten)] - // pub message_user: RowMessageUser, - // } - - // impl From<ChatWithMessageAndUsersRow> for ChatWithMessageAndUsers { - // fn from(value: ChatWithMessageAndUsersRow) -> Self { - // Self { - // chat: value.chat.into(), - // chat_user: value.chat_user.into(), - // message: value.message.into(), - // message_user: value.message_user.into(), - // } - // } - // } - - // pub struct ChatWithMessageAndUsers { - // chat: Chat, - // chat_user: User, - // message: Message, - // message_user: User, - // } - - // let chats: Vec<ChatWithMessageAndUsersRow> = sqlx::query_as("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc") - // .fetch_all(&self.db) - // .await?; - - // let chats = chats - // .into_iter() - // .map(|chat_with_message_and_users_row| { - // let chat_with_message_and_users: ChatWithMessageAndUsers = - // chat_with_message_and_users_row.into(); - // ( - // ( - // chat_with_message_and_users.chat, - // chat_with_message_and_users.chat_user, - // ), - // ( - // chat_with_message_and_users.message, - // chat_with_message_and_users.message_user, - // ), - // ) - // }) - // .collect(); - - // Ok(chats) - // } + /// chats ordered by date of last message + // greatest-n-per-group + pub(crate) fn read_chats_ordered_with_latest_messages( + &self, + ) -> Result<Vec<(Chat, Message)>, Error> { + let chats = self + .db + .prepare("select c.correspondent, c.have_chatted, m.id, m.from_jid, m.delivery, m.timestamp, m.body from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")? + .query_map([], |row| { + Ok(( + Chat { + correspondent: row.get(0)?, + have_chatted: row.get(1)?, + }, + Message { + id: row.get(2)?, + from: row.get(3)?, + delivery: row.get(4)?, + timestamp: row.get(5)?, + body: Body { + body: row.get(6)?, + }, + } + )) + })? + .collect::<Result<Vec<_>, _>>()?; + Ok(chats) + } - // async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> { - // #[derive(sqlx::FromRow)] - // struct Row { - // id: Uuid, - // } - // let chat = chat.as_bare(); - // let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?") - // .bind(chat) - // .fetch_one(&self.db) - // .await?; - // let chat_id = chat_id.id; - // Ok(chat_id) - // } + /// chats ordered by date of last message + // greatest-n-per-group + pub(crate) fn read_chats_ordered_with_latest_messages_and_users( + &self, + ) -> Result<Vec<((Chat, User), (Message, User))>, Error> { + let chats = self + .db + .prepare("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc")? + .query_map([], |row| { + Ok(( + ( + Chat { + correspondent: row.get("chat_correspondent")?, + have_chatted: row.get("chat_have_chatted")?, + }, + User { + jid: row.get("chat_user_jid")?, + nick: row.get("chat_user_nick")?, + avatar: row.get("chat_user_avatar")?, + } + ), + ( + Message { + id: row.get("message_id")?, + from: row.get("message_from_jid")?, + delivery: row.get("message_delivery")?, + timestamp: row.get("message_timestamp")?, + body: Body { + body: row.get("message_body")?, + }, + }, + User { + jid: row.get("message_user_jid")?, + nick: row.get("message_user_nick")?, + avatar: row.get("message_user_avatar")?, + } + ), + )) + })? + .collect::<Result<Vec<_>, _>>()?; + Ok(chats) + } - // async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> { - // #[derive(sqlx::FromRow)] - // struct Row { - // id: Uuid, - // } - // let chat_id: Option<Row> = sqlx::query_as("select id from chats where correspondent = ?") - // .bind(chat) - // .fetch_optional(&self.db) - // .await?; - // let chat_id = chat_id.map(|row| row.id); - // Ok(chat_id) - // } + #[tracing::instrument] + fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> { + let chat_id = self.db.query_row( + "select id from chats where correspondent = ?1", + [chat], + |row| Ok(row.get(0)?), + )?; + Ok(chat_id) + } - // /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. - // pub(crate) async fn create_message( - // &self, - // message: Message, - // chat: JID, - // from: JID, - // ) -> Result<(), Error> { - // // TODO: one query - // let from_jid = from.as_bare(); - // let chat_id = self.read_chat_id(chat).await?; - // sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, from_jid, from.resourcepart, message.timestamp).execute(&self.db).await?; - // Ok(()) - // } + fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> { + let chat_id = self + .db + .query_row( + "select id from chats where correspondent = ?1", + [chat], + |row| Ok(row.get(0)?), + ) + .optional()?; + Ok(chat_id) + } - // pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> { - // let bare_chat = chat.as_bare(); - // sqlx::query!( - // "insert into users (jid) values (?) on conflict do nothing", - // bare_chat, - // ) - // .execute(&self.db) - // .await?; - // let id = Uuid::new_v4(); - // let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?") - // .bind(id) - // .bind(bare_chat.clone()) - // .bind(false) - // .bind(bare_chat) - // .fetch_one(&self.db) - // .await?; - // tracing::debug!("CHECKING chat: {:?}", chat); - // Ok(chat.have_chatted) - // } + /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. + #[tracing::instrument] + pub(crate) fn create_message( + &self, + message: Message, + chat: JID, + from: JID, + ) -> Result<(), Error> { + let from_jid = from.as_bare(); + let chat_id = self.read_chat_id(chat)?; + tracing::debug!("creating message"); + self.db.execute("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp, delivery) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)", (&message.id, &message.body.body, &chat_id, &from_jid, &from.resourcepart, &message.timestamp, &message.delivery))?; + Ok(()) + } - // /// MUST upsert chat beforehand - // pub(crate) async fn create_message_with_self_resource( - // &self, - // message: Message, - // chat: JID, - // // full jid - // from: JID, - // ) -> Result<(), Error> { - // let from_jid = from.as_bare(); - // if let Some(resource) = &from.resourcepart { - // sqlx::query!( - // "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", - // from_jid, - // resource - // ) - // .execute(&self.db) - // .await?; - // } - // self.create_message(message, chat, from).await?; - // Ok(()) - // } + pub(crate) fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> { + let bare_chat = chat.as_bare(); + let db = &self.db; + db.execute( + "insert into users (jid) values (?1) on conflict do nothing", + [&bare_chat], + )?; + let id = Uuid::new_v4(); + db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &bare_chat, false))?; + let chat = db.query_row( + "select correspondent, have_chatted from chats where correspondent = ?1", + [&bare_chat], + |row| { + Ok(Chat { + correspondent: row.get(0)?, + have_chatted: row.get(1)?, + }) + }, + )?; + Ok(chat.have_chatted) + } - // /// create direct message from incoming. MUST upsert chat and user - // pub(crate) async fn create_message_with_user_resource( - // &self, - // message: Message, - // chat: JID, - // // full jid - // from: JID, - // ) -> Result<(), Error> { - // let bare_chat = chat.as_bare(); - // let resource = &chat.resourcepart; - // // sqlx::query!( - // // "insert into users (jid) values (?) on conflict do nothing", - // // bare_chat - // // ) - // // .execute(&self.db) - // // .await?; - // // let id = Uuid::new_v4(); - // // sqlx::query!( - // // "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", - // // id, - // // bare_chat - // // ) - // // .execute(&self.db) - // // .await?; - // if let Some(resource) = resource { - // sqlx::query!( - // "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", - // bare_chat, - // resource - // ) - // .execute(&self.db) - // .await?; - // } - // self.create_message(message, chat, from).await?; - // Ok(()) - // } + /// create direct message from incoming. MUST upsert chat and user + #[tracing::instrument] + pub(crate) fn create_message_with_user_resource( + &self, + message: Message, + // TODO: enforce two kinds of jid. bare and full + // must be bare jid + chat: JID, + // full jid + from: JID, + ) -> Result<(), Error> { + let from_jid = from.as_bare(); + let chat = chat.as_bare(); + tracing::debug!("creating resource"); + if let Some(resource) = &from.resourcepart { + self.db.execute( + "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing", + (&from_jid, resource), + )?; + } + self.create_message(message, chat, from)?; + Ok(()) + } - // pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> { - // let message: Message = sqlx::query_as("select * from messages where id = ?") - // .bind(message) - // .fetch_one(&self.db) - // .await?; - // Ok(message) + pub(crate) fn update_message_delivery( + &self, + message: Uuid, + delivery: Delivery, + ) -> Result<(), Error> { + self.db.execute( + "update messages set delivery = ?1 where id = ?2", + (delivery, message), + )?; + Ok(()) + } + + // pub(crate) fn read_message(&self, message: Uuid) -> Result<Message, Error> { + // Ok(Message { + // id: Uuid, + // from: todo!(), + // delivery: todo!(), + // timestamp: todo!(), + // body: todo!(), + // }) // } - // // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {} + // TODO: message updates/edits pub(crate) fn update_message(&self, message: Message) -> Result<(), Error> {} - // pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> { - // sqlx::query!("delete from messages where id = ?", message) - // .execute(&self.db) - // .await?; - // Ok(()) - // } + pub(crate) fn delete_message(&self, message: Uuid) -> Result<(), Error> { + self.db + .execute("delete from messages where id = ?1", [message])?; + Ok(()) + } - // // TODO: paging - // pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> { - // let chat_id = self.read_chat_id(chat).await?; - // let messages: Vec<Message> = - // sqlx::query_as("select * from messages where chat_id = ? order by timestamp asc") - // .bind(chat_id) - // .fetch_all(&self.db) - // .await?; - // Ok(messages) - // } + pub(crate) fn read_message(&self, message: Uuid) -> Result<Message, Error> { + let message = self.db.query_row( + "select id, from_jid, delivery, timestamp, body from messages where id = ?1", + [&message], + |row| { + Ok(Message { + id: row.get(0)?, + // TODO: full from + from: row.get(1)?, + delivery: row.get(2)?, + timestamp: row.get(3)?, + body: Body { body: row.get(4)? }, + }) + }, + )?; + Ok(message) + } - // pub(crate) async fn read_message_history_with_users( - // &self, - // chat: JID, - // ) -> Result<Vec<(Message, User)>, Error> { - // let chat_id = self.read_chat_id(chat).await?; - // #[derive(sqlx::FromRow)] - // pub struct Row { - // #[sqlx(flatten)] - // user: User, - // #[sqlx(flatten)] - // message: Message, - // } - // let messages: Vec<Row> = - // sqlx::query_as("select * from messages join users on jid = from_jid where chat_id = ? order by timestamp asc") - // .bind(chat_id) - // .fetch_all(&self.db) - // .await?; - // let messages = messages - // .into_iter() - // .map(|row| (row.message, row.user)) - // .collect(); - // Ok(messages) - // } + // TODO: paging + pub(crate) fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> { + let chat_id = self.read_chat_id(chat)?; + let messages = self + .db + .prepare( + "select id, from_jid, delivery, timestamp, body from messages where chat_id = ?1", + )? + .query_map([chat_id], |row| { + Ok(Message { + id: row.get(0)?, + // TODO: full from + from: row.get(1)?, + delivery: row.get(2)?, + timestamp: row.get(3)?, + body: Body { body: row.get(4)? }, + }) + })? + .collect::<Result<Vec<_>, _>>()?; + Ok(messages) + } - // pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> { - // let online: Online = sqlx::query_as("select * from cached_status where id = 0") - // .fetch_one(&self.db) - // .await?; - // Ok(online) - // } + pub(crate) fn read_message_history_with_users( + &self, + chat: JID, + ) -> Result<Vec<(Message, User)>, Error> { + let chat_id = self.read_chat_id(chat)?; + let messages = self + .db + .prepare( + "select id, from_jid, delivery, timestamp, body, jid, nick, avatar from messages join users on jid = from_jid where chat_id = ? order by timestamp asc", + )? + .query_map([chat_id], |row| { + Ok(( + Message { + id: row.get(0)?, + // TODO: full from + from: row.get(1)?, + delivery: row.get(2)?, + timestamp: row.get(3)?, + body: Body { body: row.get(4)? }, + }, + User { + jid: row.get(5)?, + nick: row.get(6)?, + avatar: row.get(7)?, + } + )) + })? + .collect::<Result<Vec<_>, _>>()?; + Ok(messages) + } - // pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { - // sqlx::query!( - // "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?", - // status.show, - // status.status, - // status.show, - // status.status - // ).execute(&self.db).await?; - // Ok(()) - // } + pub(crate) fn read_cached_status(&self) -> Result<Online, Error> { + let status = self.db.query_row( + "select show, message from cached_status where id = 0", + [], + |row| { + Ok(Online { + show: row.get(0)?, + status: row.get(1)?, + priority: None, + }) + }, + )?; + Ok(status) + } - // pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> { - // sqlx::query!("update cached_status set show = null, message = null where id = 0") - // .execute(&self.db) - // .await?; - // Ok(()) - // } + pub(crate) fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { + self.db.execute("insert into cached_status (id, show, message) values (0, ?1, ?2) on conflict do update set show = ?3, message = ?4", (status.show, &status.status, status.show, &status.status))?; + Ok(()) + } - // pub(crate) async fn read_capabilities(&self, node: &str) -> Result<String, Error> { - // #[derive(sqlx::FromRow)] - // struct Row { - // capabilities: String, - // } - // let row: Row = - // sqlx::query_as("select capabilities from capability_hash_nodes where node = ?") - // .bind(node) - // .fetch_one(&self.db) - // .await?; - // Ok(row.capabilities) - // } + pub(crate) fn delete_cached_status(&self) -> Result<(), Error> { + self.db.execute( + "update cached_status set show = null, message = null where id = 0", + [], + )?; + Ok(()) + } - // pub(crate) async fn upsert_capabilities( - // &self, - // node: &str, - // capabilities: &str, - // ) -> Result<(), Error> { - // let now = Utc::now(); - // sqlx::query!( - // "insert into capability_hash_nodes (node, timestamp, capabilities) values (?, ?, ?) on conflict do update set timestamp = ?, capabilities = ?", node, now, capabilities, now, capabilities - // ).execute(&self.db).await?; - // Ok(()) - // } + pub(crate) fn read_capabilities(&self, node: String) -> Result<String, Error> { + let capabilities = self.db.query_row( + "select capabilities from capability_hash_nodes where node = ?1", + [node], + |row| Ok(row.get(0)?), + )?; + Ok(capabilities) + } + + pub(crate) fn upsert_capabilities( + &self, + node: String, + capabilities: String, + ) -> Result<(), Error> { + let now = Utc::now(); + self.db.execute("insert into capability_hash_nodes (node, timestamp, capabilities) values (?1, ?2, ?3) on conflict do update set timestamp = ?, capabilities = ?", (node, now, &capabilities, now, &capabilities))?; + Ok(()) + } } diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 6b7d0ae..af3320f 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -3,12 +3,16 @@ use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc}; use base64::DecodeError; use image::ImageError; use jid::JID; -use lampada::error::{ActorError, ConnectionError, ReadError, WriteError}; +use lampada::error::{ActorError, ReadError, WriteError}; use stanza::client::{Stanza, iq::Query}; use thiserror::Error; pub use lampada::error::CommandError; +pub use lampada::error::ConnectionError; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::oneshot::error::RecvError; +use crate::db::DbCommand; use crate::files::FileStore; // for the client logic impl @@ -165,12 +169,99 @@ pub enum ResponseError { } #[derive(Debug, Error, Clone)] -#[error("database error: {0}")] -pub struct DatabaseError(pub Arc<rusqlite::Error>); +pub enum DatabaseError { + #[error("database error: {0}")] + Database(Serializeable<Arc<rusqlite::Error>>), + #[error("database command send: {0}")] + Send(Arc<SendError<DbCommand>>), + #[error("database result recv: {0}")] + Recv(#[from] RecvError), +} + +impl From<SendError<DbCommand>> for DatabaseError { + fn from(e: SendError<DbCommand>) -> Self { + Self::Send(Arc::new(e)) + } +} + +pub enum Serializeable<T> { + String(String), + Unserialized(T), +} + +impl<T: std::fmt::Display> std::fmt::Display for Serializeable<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + Serializeable::String(s) => s.fmt(f), + Serializeable::Unserialized(t) => t.fmt(f), + } + } +} + +impl<T: std::fmt::Debug> std::fmt::Debug for Serializeable<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + Serializeable::String(s) => s.fmt(f), + Serializeable::Unserialized(t) => t.fmt(f), + } + } +} + +impl<T: Clone> Clone for Serializeable<T> { + fn clone(&self) -> Self { + match self { + Serializeable::String(s) => Self::String(s.clone()), + Serializeable::Unserialized(t) => Self::Unserialized(t.clone()), + } + } +} + +#[cfg(feature = "serde")] +struct StringVisitor; + +#[cfg(feature = "serde")] +impl<'de> serde::de::Visitor<'de> for StringVisitor { + type Value = String; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a string") + } + + fn visit_string<E>(self, v: String) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(v) + } +} + +#[cfg(feature = "serde")] +impl<'de> serde::Deserialize<'de> for DatabaseError { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + let string = deserializer.deserialize_string(StringVisitor)?; + Ok(Self::Database(Serializeable::String(string))) + } +} + +#[cfg(feature = "serde")] +impl serde::Serialize for DatabaseError { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + match &self.0 { + Serializeable::String(s) => serializer.serialize_str(s), + Serializeable::Unserialized(u) => serializer.serialize_str(&u.to_string()), + } + } +} impl From<rusqlite::Error> for DatabaseError { fn from(e: rusqlite::Error) -> Self { - Self(Arc::new(e)) + Self::Database(Serializeable::Unserialized(Arc::new(e))) } } @@ -209,6 +300,8 @@ pub enum DatabaseOpenError { Io(Arc<tokio::io::Error>), #[error("invalid path")] InvalidPath, + #[error("tokio oneshot recv error: {0}")] + Recv(#[from] tokio::sync::oneshot::error::RecvError), } // impl From<sqlx::migrate::MigrateError> for DatabaseOpenError { @@ -237,8 +330,8 @@ pub enum PresenceError { Unsupported, #[error("missing from")] MissingFrom, - #[error("stanza error: {0}")] - StanzaError(#[from] stanza::client::error::Error), + #[error("stanza error: {0:?}")] + StanzaError(Option<stanza::client::error::Error>), } #[derive(Debug, Error, Clone)] diff --git a/filamento/src/files.rs b/filamento/src/files.rs index 644f883..dcc9cd2 100644 --- a/filamento/src/files.rs +++ b/filamento/src/files.rs @@ -9,6 +9,7 @@ use std::{ use tokio::io; use tokio::sync::Mutex; +#[cfg(not(target_arch = "wasm32"))] pub trait FileStore { type Err: Clone + Send + Error; @@ -27,6 +28,19 @@ pub trait FileStore { ) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send; } +#[cfg(target_arch = "wasm32")] +pub trait FileStore { + type Err: Clone + Send + Error; + + fn is_stored(&self, name: &str) -> impl std::future::Future<Output = Result<bool, Self::Err>>; + fn store( + &self, + name: &str, + data: &[u8], + ) -> impl std::future::Future<Output = Result<(), Self::Err>>; + fn delete(&self, name: &str) -> impl std::future::Future<Output = Result<(), Self::Err>>; +} + #[derive(Clone, Debug)] pub struct FilesMem { files: Arc<Mutex<HashMap<String, Vec<u8>>>>, @@ -38,8 +52,19 @@ impl FilesMem { files: Arc::new(Mutex::new(HashMap::new())), } } + + pub async fn get_file(&self, name: impl AsRef<str>) -> Option<Vec<u8>> { + let name = name.as_ref(); + self.files.lock().await.get(name).cloned() + } } +#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +pub mod opfs; + +#[cfg(all(feature = "opfs", target_arch = "wasm32"))] +pub use opfs::FilesOPFS; + impl FileStore for FilesMem { type Err = Infallible; diff --git a/filamento/src/files/opfs.rs b/filamento/src/files/opfs.rs new file mode 100644 index 0000000..fb32c6e --- /dev/null +++ b/filamento/src/files/opfs.rs @@ -0,0 +1,128 @@ +use std::path::Path; + +use thiserror::Error; +use wasm_bindgen::{JsCast, JsValue}; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + File, FileSystemDirectoryHandle, FileSystemFileHandle, FileSystemGetDirectoryOptions, + FileSystemGetFileOptions, FileSystemWritableFileStream, Url, js_sys, window, +}; + +use crate::FileStore; + +#[derive(Clone)] +pub struct FilesOPFS { + directory: String, +} + +impl FilesOPFS { + pub async fn new(directory: impl AsRef<str>) -> Result<Self, OPFSError> { + let directory = directory.as_ref(); + let directory_string = directory.to_string(); + let promise = window().unwrap().navigator().storage().get_directory(); + let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into(); + let options = FileSystemGetDirectoryOptions::new(); + options.set_create(true); + let directory: FileSystemDirectoryHandle = + JsFuture::from(opfs_root.get_directory_handle_with_options(directory, &options)) + .await? + .into(); + Ok(Self { + directory: directory_string, + }) + } + + pub async fn get_src(&self, file_name: impl AsRef<str>) -> Result<String, OPFSError> { + let promise = window().unwrap().navigator().storage().get_directory(); + let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into(); + let directory: FileSystemDirectoryHandle = + JsFuture::from(opfs_root.get_directory_handle(&self.directory)) + .await? + .into(); + let handle: FileSystemFileHandle = + JsFuture::from(directory.get_file_handle(file_name.as_ref())) + .await? + .into(); + let file: File = JsFuture::from(handle.get_file()).await?.into(); + let src = Url::create_object_url_with_blob(&file)?; + Ok(src) + } +} + +impl FileStore for FilesOPFS { + type Err = OPFSError; + + async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> { + let promise = window().unwrap().navigator().storage().get_directory(); + let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into(); + let directory: FileSystemDirectoryHandle = + JsFuture::from(opfs_root.get_directory_handle(&self.directory)) + .await? + .into(); + let stored = JsFuture::from(directory.get_file_handle(name)) + .await + .map(|_| true) + // TODO: distinguish between other errors and notfound + .unwrap_or(false); + Ok(stored) + } + + async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> { + let promise = window().unwrap().navigator().storage().get_directory(); + let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into(); + let directory: FileSystemDirectoryHandle = + JsFuture::from(opfs_root.get_directory_handle(&self.directory)) + .await? + .into(); + let options = FileSystemGetFileOptions::new(); + options.set_create(true); + let handle: FileSystemFileHandle = + JsFuture::from(directory.get_file_handle_with_options(name, &options)) + .await? + .into(); + let write_handle: FileSystemWritableFileStream = + JsFuture::from(handle.create_writable()).await?.into(); + + let buffer = js_sys::ArrayBuffer::new(data.len() as u32); + let u8arr = js_sys::Uint8Array::new(&buffer); + for (idx, v) in data.iter().enumerate() { + u8arr.set_index(idx as u32, *v); + } + + let write_promise = write_handle.write_with_js_u8_array(&u8arr).unwrap(); + // let write_promise = write_handle.write_with_u8_array(data)?; + let _ = JsFuture::from(write_promise).await?; + let _ = JsFuture::from(write_handle.close()).await?; + Ok(()) + } + + async fn delete(&self, name: &str) -> Result<(), Self::Err> { + let promise = window().unwrap().navigator().storage().get_directory(); + let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into(); + let directory: FileSystemDirectoryHandle = + JsFuture::from(opfs_root.get_directory_handle(&self.directory)) + .await? + .into(); + let _ = JsFuture::from(directory.remove_entry(name)).await?; + Ok(()) + } +} + +#[derive(Error, Clone, Debug)] +pub enum OPFSError { + #[error("js opfs error: {0}")] + Error(String), +} + +// TODO: better errors +impl From<JsValue> for OPFSError { + fn from(value: JsValue) -> Self { + Self::Error( + value + .dyn_into::<js_sys::Error>() + .ok() + .and_then(|err| err.message().as_string()) + .unwrap_or(String::from("<no string>")), + ) + } +} diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index e06f7c6..068bfe8 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -69,7 +69,10 @@ pub enum Command<Fs: FileStore> { ), /// get a specific chat by jid GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>), + /// get a specific chat and user by jid + GetChatAndUser(JID, oneshot::Sender<Result<(Chat, User), DatabaseError>>), /// get message history for chat (does appropriate mam things) + GetMessage(Uuid, oneshot::Sender<Result<Message, DatabaseError>>), // TODO: paging and filtering GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>), /// get message history for chat (does appropriate mam things) @@ -253,7 +256,7 @@ impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> { let client = Self { sender: command_sender, // TODO: configure timeout - timeout: Duration::from_secs(10), + timeout: Duration::from_secs(20), }; let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send, file_store); @@ -268,9 +271,17 @@ impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> { } impl<Fs: FileStore> Client<Fs> { - pub async fn connect(&self) -> Result<(), ActorError> { - self.send(CoreClientCommand::Connect).await?; - Ok(()) + /// returns the resource + pub async fn connect(&self) -> Result<String, CommandError<ConnectionError>> { + let (send, recv) = oneshot::channel::<Result<String, ConnectionError>>(); + self.send(CoreClientCommand::Connect(send)) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) } pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> { @@ -374,6 +385,30 @@ impl<Fs: FileStore> Client<Fs> { Ok(chat) } + pub async fn get_chat_and_user(&self, jid: JID) -> Result<(Chat, User), CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetChatAndUser(jid, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let result= timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(result) + } + + pub async fn get_message(&self, id: Uuid) -> Result<Message, CommandError<DatabaseError>> { + let (send, recv) = oneshot::channel(); + self.send(CoreClientCommand::Command(Command::GetMessage(id, send))) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; + let message = timeout(self.timeout, recv) + .await + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? + .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; + Ok(message) + } + pub async fn get_messages( &self, jid: JID, diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs index dc94d2c..f5705f4 100644 --- a/filamento/src/logic/local_only.rs +++ b/filamento/src/logic/local_only.rs @@ -44,6 +44,20 @@ pub async fn handle_get_chat<Fs: FileStore + Clone>( Ok(logic.db().read_chat(jid).await?) } +pub async fn handle_get_chat_and_user<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + jid: JID, +) -> Result<(Chat, User), DatabaseError> { + Ok(logic.db().read_chat_and_user(jid).await?) +} + +pub async fn handle_get_message<Fs: FileStore + Clone>( + logic: &ClientLogic<Fs>, + id: Uuid, +) -> Result<Message, DatabaseError> { + Ok(logic.db().read_message(id).await?) +} + pub async fn handle_get_messages<Fs: FileStore + Clone>( logic: &ClientLogic<Fs>, jid: JID, diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index b87484c..606b04f 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -19,13 +19,9 @@ use crate::{ }; use super::{ - ClientLogic, local_only::{ - handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, - handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, - handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, - handle_get_messages_with_users, handle_get_user, - }, + handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chat_and_user, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_message, handle_get_messages, handle_get_messages_with_users, handle_get_user + }, ClientLogic }; pub async fn handle_offline<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>) { @@ -89,6 +85,14 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>( let chats = handle_get_chat(logic, jid).await; sender.send(chats); } + Command::GetChatAndUser(jid, sender) => { + let chat = handle_get_chat_and_user(logic, jid).await; + let _ = sender.send(chat); + } + Command::GetMessage(id, sender) => { + let message = handle_get_message(logic, id).await; + let _ = sender.send(message); + } Command::GetMessages(jid, sender) => { let messages = handle_get_messages(logic, jid).await; sender.send(messages); @@ -159,7 +163,7 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>( // TODO: mark these as potentially failed upon client launch if let Err(e) = logic .db() - .create_message_with_self_resource( + .create_message_with_user_resource( message.clone(), jid.clone(), // TODO: when message is queued and sent, the from must also be updated with the correct resource diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index 767f923..9814ff2 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -23,7 +23,7 @@ use crate::{ use super::{ local_only::{ - handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user + handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chat_and_user, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_message, handle_get_messages, handle_get_messages_with_users, handle_get_user }, ClientLogic }; @@ -477,7 +477,7 @@ pub async fn handle_set_status<Fs: FileStore + Clone>( pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) { // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. // let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); - let have_chatted = match logic.db().upsert_chat_and_user(&jid).await { + let have_chatted = match logic.db().upsert_chat_and_user(jid.clone()).await { Ok(have_chatted) => { have_chatted }, @@ -523,7 +523,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, // TODO: mark these as potentially failed upon client launch if let Err(e) = logic .db() - .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone()) + .create_message_with_user_resource(message.clone(), jid.clone(), connection.jid().clone()) .await { // TODO: should these really be handle_error or just the error macro? @@ -583,6 +583,9 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, match result { Ok(_) => { info!("sent message: {:?}", message_stanza); + if let Err(e) = logic.db().update_message_delivery(id, Delivery::Written).await { + error!("updating message delivery: {}", e); + } logic .update_sender() .send(UpdateMessage::MessageDelivery { @@ -1107,6 +1110,14 @@ pub async fn handle_online_result<Fs: FileStore + Clone>( let chat = handle_get_chat(logic, jid).await; let _ = sender.send(chat); } + Command::GetChatAndUser(jid, sender) => { + let chat = handle_get_chat_and_user(logic, jid).await; + let _ = sender.send(chat); + } + Command::GetMessage(id, sender) => { + let message = handle_get_message(logic, id).await; + let _ = sender.send(message); + } Command::GetMessages(jid, sender) => { let messages = handle_get_messages(logic, jid).await; let _ = sender.send(messages); diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index cdaff97..30d0830 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -77,9 +77,10 @@ pub async fn recv_message<Fs: FileStore + Clone>( }, delivery: None, }; + // TODO: process message type="error" // save the message to the database - match logic.db().upsert_chat_and_user(&from).await { + match logic.db().upsert_chat_and_user(from.clone()).await { Ok(_) => { if let Err(e) = logic .db() @@ -90,17 +91,11 @@ pub async fn recv_message<Fs: FileStore + Clone>( ) .await { - logic - .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) - .await; - error!("failed to upsert chat and user") + error!("failed to create message: {}", e); } } Err(e) => { - logic - .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) - .await; - error!("failed to upsert chat and user") + error!("failed to upsert chat and user: {}", e); } }; @@ -484,13 +479,7 @@ pub async fn recv_presence( stanza::client::presence::PresenceType::Error => { // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display - Err(PresenceError::StanzaError( - presence - .errors - .first() - .cloned() - .expect("error MUST have error"), - )) + Err(PresenceError::StanzaError(presence.errors.first().cloned())) } // should not happen (error to server) stanza::client::presence::PresenceType::Probe => { @@ -595,7 +584,7 @@ pub async fn recv_iq<Fs: FileStore + Clone>( } else { match logic .db() - .read_capabilities(&query.node.clone().unwrap()) + .read_capabilities(query.node.clone().unwrap()) .await { Ok(c) => match caps::decode_info_base64(c) { diff --git a/filamento/src/presence.rs b/filamento/src/presence.rs index e406cce..de4dd7c 100644 --- a/filamento/src/presence.rs +++ b/filamento/src/presence.rs @@ -1,4 +1,8 @@ use chrono::{DateTime, Utc}; +use rusqlite::{ + ToSql, + types::{FromSql, ToSqlOutput, Value}, +}; use stanza::{client::presence::String1024, xep_0203::Delay}; use crate::caps; @@ -10,7 +14,7 @@ pub struct Online { pub priority: Option<i8>, } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Show { Away, Chat, @@ -18,6 +22,30 @@ pub enum Show { ExtendedAway, } +impl ToSql for Show { + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { + Ok(match self { + Show::Away => ToSqlOutput::Owned(Value::Text("away".to_string())), + Show::Chat => ToSqlOutput::Owned(Value::Text("chat".to_string())), + Show::DoNotDisturb => ToSqlOutput::Owned(Value::Text("do-not-disturb".to_string())), + Show::ExtendedAway => ToSqlOutput::Owned(Value::Text("extended-away".to_string())), + }) + } +} + +impl FromSql for Show { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> { + Ok(match value.as_str()? { + "away" => Self::Away, + "chat" => Self::Chat, + "do-not-disturb" => Self::DoNotDisturb, + "extended-away" => Self::ExtendedAway, + // TODO: horrible + value => panic!("unexpected {value}"), + }) + } +} + #[derive(Debug, Default, Clone)] pub struct Offline { pub status: Option<String>, diff --git a/filamento/src/roster.rs b/filamento/src/roster.rs index ba5b3cd..6b61e10 100644 --- a/filamento/src/roster.rs +++ b/filamento/src/roster.rs @@ -1,13 +1,18 @@ -use std::collections::HashSet; +use std::{collections::HashSet, fmt::Display}; use jid::JID; +use rusqlite::{ + ToSql, + types::{FromSql, ToSqlOutput, Value}, +}; pub struct ContactUpdate { pub name: Option<String>, pub groups: HashSet<String>, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "reactive_stores", derive(reactive_stores::Store))] pub struct Contact { // jid is the id used to reference everything, but not the primary key pub user_jid: JID, @@ -17,24 +22,99 @@ pub struct Contact { // TODO: avatar, nickname /// nickname picked by contact // nickname: Option<String>, + #[cfg_attr(feature = "reactive_stores", store(key: String = |group| group.clone()))] pub groups: HashSet<String>, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +/// Contact subscription state. pub enum Subscription { + /// No subscriptions. None, + /// Pending outgoing subscription request. PendingOut, + /// Pending incoming subscription request. PendingIn, + /// Pending incoming & pending outgoing subscription requests. PendingInPendingOut, + /// Subscribed to. OnlyOut, + /// Subscription from. OnlyIn, + /// Subscribed to & pending incoming subscription request. OutPendingIn, + /// Subscription from & pending outgoing subscription request. InPendingOut, + /// Buddy (subscriptions both ways). Buddy, // TODO: perhaps don't need, just emit event to remove contact // Remove, } +impl ToSql for Subscription { + fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { + Ok(match self { + Subscription::None => ToSqlOutput::Owned(Value::Text("none".to_string())), + Subscription::PendingOut => ToSqlOutput::Owned(Value::Text("pending-out".to_string())), + Subscription::PendingIn => ToSqlOutput::Owned(Value::Text("pending-in".to_string())), + Subscription::PendingInPendingOut => { + ToSqlOutput::Owned(Value::Text("pending-in-pending-out".to_string())) + } + Subscription::OnlyOut => ToSqlOutput::Owned(Value::Text("only-out".to_string())), + Subscription::OnlyIn => ToSqlOutput::Owned(Value::Text("only-in".to_string())), + Subscription::OutPendingIn => { + ToSqlOutput::Owned(Value::Text("out-pending-in".to_string())) + } + Subscription::InPendingOut => { + ToSqlOutput::Owned(Value::Text("in-pending-out".to_string())) + } + Subscription::Buddy => ToSqlOutput::Owned(Value::Text("buddy".to_string())), + }) + } +} + +impl FromSql for Subscription { + fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> { + Ok(match value.as_str()? { + "none" => Self::None, + "pending-out" => Self::PendingOut, + "pending-in" => Self::PendingIn, + "pending-in-pending-out" => Self::PendingInPendingOut, + "only-out" => Self::OnlyOut, + "only-in" => Self::OnlyIn, + "out-pending-in" => Self::OutPendingIn, + "in-pending-out" => Self::InPendingOut, + "buddy" => Self::Buddy, + // TODO: don't have these lol + value => panic!("unexpected subscription `{value}`"), + }) + } +} + +impl Display for Subscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Subscription::None => write!(f, "No Subscriptions"), + Subscription::PendingOut => write!(f, "Pending Outgoing Subscription Request"), + Subscription::PendingIn => write!(f, "Pending Incoming Subscription Request"), + Subscription::PendingInPendingOut => write!( + f, + "Pending Incoming & Pending Outgoing Subscription Requests" + ), + Subscription::OnlyOut => write!(f, "Subscribed To"), + Subscription::OnlyIn => write!(f, "Subscription From"), + Subscription::OutPendingIn => { + write!(f, "Subscribed To & Pending Incoming Subscription Request") + } + Subscription::InPendingOut => write!( + f, + "Subscription From & Pending Outgoing Subscription Request" + ), + Subscription::Buddy => write!(f, "Buddy (Subscriptions Both Ways)"), + } + } +} + // none // > // >> diff --git a/filamento/src/user.rs b/filamento/src/user.rs index b2ea8e4..f30933c 100644 --- a/filamento/src/user.rs +++ b/filamento/src/user.rs @@ -1,6 +1,8 @@ use jid::JID; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +#[cfg_attr(feature = "reactive_stores", derive(reactive_stores::Store))] pub struct User { pub jid: JID, pub nick: Option<String>, |