diff options
Diffstat (limited to 'filamento/src/db.rs')
-rw-r--r-- | filamento/src/db.rs | 247 |
1 files changed, 108 insertions, 139 deletions
diff --git a/filamento/src/db.rs b/filamento/src/db.rs index 1b5afe6..385382a 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -2,7 +2,7 @@ use core::fmt::Display; use std::{collections::HashSet, ops::Deref, path::Path, sync::Arc}; use chrono::{DateTime, Utc}; -use jid::JID; +use jid::{BareJID, FullJID, JID}; use rusqlite::{Connection, OptionalExtension}; use tokio::sync::{Mutex, MutexGuard}; use tokio::sync::{mpsc, oneshot}; @@ -58,53 +58,22 @@ impl Db { pub async fn create_connect_and_migrate( path: impl AsRef<Path> + Send, ) -> Result<Self, DatabaseOpenError> { - let (sender, receiver) = mpsc::channel(20); - let (result_send, result_recv) = oneshot::channel(); - spawn_blocking(move || { - let result = DbActor::new(path, receiver).await; - match result { - Ok(a) => { - result_send.send(Ok(())); - a.run() - } - Err(e) => { - result_send.send(Err(e)); - } - } - }); - match result_recv.await { - Ok(r) => match r { - Ok(o) => Ok(Self { sender }), - Err(e) => return Err(e), - }, - Err(e) => return Err(e.into()), - } + let (sender, receiver) = mpsc::unbounded_channel(); + + let actor = DbActor::new(path, receiver)?; + spawn_blocking(move || actor.run()); + + Ok(Self { sender }) } #[cfg(not(target_arch = "wasm32"))] pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> { - let (sender, receiver) = mpsc::channel(20); - let (result_send, result_recv) = oneshot::channel(); - spawn_blocking(move || { - let result = DbActor::new_memory(receiver).await; - match result { - Ok(a) => { - result_send.send(Ok(())); - // TODO: async run when not wasm - a.run() - } - Err(e) => { - result_send.send(Err(e)); - } - } - }); - match result_recv.await { - Ok(r) => match r { - Ok(o) => Ok(Self { sender }), - Err(e) => return Err(e), - }, - Err(e) => return Err(e.into()), - } + let (sender, receiver) = mpsc::unbounded_channel(); + + let actor = DbActor::new_memory(receiver)?; + spawn_blocking(move || actor.run()); + + Ok(Self { sender }) } /// `file_name` should be a file not in a directory @@ -183,7 +152,7 @@ impl Db { } // TODO: this is not a 'read' user - pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { + pub(crate) async fn read_user(&self, user: BareJID) -> Result<User, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadUser { user, result }; self.sender.send(command); @@ -192,7 +161,7 @@ impl Db { } /// returns whether or not the nickname was updated - pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { + pub(crate) async fn delete_user_nick(&self, jid: BareJID) -> Result<bool, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::DeleteUserNick { jid, result }; self.sender.send(command); @@ -201,7 +170,7 @@ impl Db { } /// returns whether or not the nickname was updated - pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { + pub(crate) async fn upsert_user_nick(&self, jid: BareJID, nick: String) -> Result<bool, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::UpsertUserNick { jid, nick, result }; self.sender.send(command); @@ -212,7 +181,7 @@ impl Db { /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar pub(crate) async fn delete_user_avatar( &self, - jid: JID, + jid: BareJID, ) -> Result<(bool, Option<String>), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::DeleteUserAvatar { jid, result }; @@ -224,7 +193,7 @@ impl Db { /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar pub(crate) async fn upsert_user_avatar( &self, - jid: JID, + jid: BareJID, avatar: String, ) -> Result<(bool, Option<String>), Error> { let (result, recv) = oneshot::channel(); @@ -259,7 +228,7 @@ impl Db { result } - pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> { + pub(crate) async fn read_contact(&self, contact: BareJID) -> Result<Contact, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadContact { contact, result }; self.sender.send(command); @@ -267,7 +236,10 @@ impl Db { result } - pub(crate) async fn read_contact_opt(&self, contact: JID) -> Result<Option<Contact>, Error> { + pub(crate) async fn read_contact_opt( + &self, + contact: BareJID, + ) -> Result<Option<Contact>, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadContactOpt { contact, result }; self.sender.send(command); @@ -292,7 +264,7 @@ impl Db { result } - pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> { + pub(crate) async fn delete_contact(&self, contact: BareJID) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::DeleteContact { contact, result }; self.sender.send(command); @@ -336,7 +308,7 @@ impl Db { // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster. - pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> { + pub(crate) async fn read_chat(&self, chat: BareJID) -> Result<Chat, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadChat { chat, result }; self.sender.send(command); @@ -344,7 +316,7 @@ impl Db { result } - pub(crate) async fn read_chat_and_user(&self, chat: JID) -> Result<(Chat, User), Error> { + pub(crate) async fn read_chat_and_user(&self, chat: BareJID) -> Result<(Chat, User), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::ReadChatAndUser { chat, result }; self.sender.send(command); @@ -352,7 +324,7 @@ impl Db { result } - pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { + pub(crate) async fn mark_chat_as_chatted(&self, chat: BareJID) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::MarkChatAsChatted { chat, result }; self.sender.send(command); @@ -363,7 +335,7 @@ impl Db { pub(crate) async fn update_chat_correspondent( &self, old_chat: Chat, - new_correspondent: JID, + new_correspondent: BareJID, ) -> Result<Chat, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::UpdateChatCorrespondent { @@ -378,7 +350,7 @@ impl Db { // pub(crate) async fn update_chat - pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> { + pub(crate) async fn delete_chat(&self, chat: BareJID) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::DeleteChat { chat, result }; self.sender.send(command); @@ -434,8 +406,8 @@ impl Db { pub(crate) async fn create_message( &self, message: Message, - chat: JID, - from: JID, + chat: BareJID, + from: FullJID, ) -> Result<(), Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::CreateMessage { @@ -449,7 +421,7 @@ impl Db { result } - pub(crate) async fn upsert_chat_and_user(&self, chat: JID) -> Result<bool, Error> { + pub(crate) async fn upsert_chat_and_user(&self, chat: BareJID) -> Result<bool, Error> { let (result, recv) = oneshot::channel(); let command = DbCommand::UpsertChatAndUser { chat, result }; self.sender.send(command); @@ -463,10 +435,8 @@ impl Db { &self, message: Message, // TODO: enforce two kinds of jid. bare and full - // must be bare jid - chat: JID, - // full jid - from: JID, + chat: BareJID, + from: FullJID, ) -> Result<(), Error> { tracing::info!("MSGDEBUG create_message_with_user_resource exists"); let (result, recv) = oneshot::channel(); @@ -514,9 +484,9 @@ impl Db { DeleteCachedStatus => delete_cached_status() -> (); UpsertCachedStatus => upsert_cached_status(status: Online) -> (); ReadCachedStatus => read_cached_status() -> Online; - ReadMessageHistoryWithUsers => read_message_history_with_users(chat: JID) -> Vec<(Message, User)>; + ReadMessageHistoryWithUsers => read_message_history_with_users(chat: BareJID) -> Vec<(Message, User)>; // TODO: paging - ReadMessageHistory => read_message_history(chat: JID) -> Vec<Message>; + ReadMessageHistory => read_message_history(chat: BareJID) -> Vec<Message>; ReadMessage => read_message(message: Uuid) -> Message; DeleteMessage => delete_message(message: Uuid) -> () ); @@ -546,24 +516,24 @@ pub enum DbCommand { result: oneshot::Sender<Result<(), Error>>, }, ReadUser { - user: JID, + user: BareJID, result: oneshot::Sender<Result<User, Error>>, }, DeleteUserNick { - jid: JID, + jid: BareJID, result: oneshot::Sender<Result<bool, Error>>, }, UpsertUserNick { - jid: JID, + jid: BareJID, nick: String, result: oneshot::Sender<Result<bool, Error>>, }, DeleteUserAvatar { - jid: JID, + jid: BareJID, result: oneshot::Sender<Result<(bool, Option<String>), Error>>, }, UpsertUserAvatar { - jid: JID, + jid: BareJID, avatar: String, result: oneshot::Sender<Result<(bool, Option<String>), Error>>, }, @@ -576,11 +546,11 @@ pub enum DbCommand { result: oneshot::Sender<Result<(), Error>>, }, ReadContact { - contact: JID, + contact: BareJID, result: oneshot::Sender<Result<Contact, Error>>, }, ReadContactOpt { - contact: JID, + contact: BareJID, result: oneshot::Sender<Result<Option<Contact>, Error>>, }, UpdateContact { @@ -592,7 +562,7 @@ pub enum DbCommand { result: oneshot::Sender<Result<(), Error>>, }, DeleteContact { - contact: JID, + contact: BareJID, result: oneshot::Sender<Result<(), Error>>, }, ReplaceCachedRoster { @@ -610,24 +580,24 @@ pub enum DbCommand { result: oneshot::Sender<Result<(), Error>>, }, ReadChat { - chat: JID, + chat: BareJID, result: oneshot::Sender<Result<Chat, Error>>, }, ReadChatAndUser { - chat: JID, + chat: BareJID, result: oneshot::Sender<Result<(Chat, User), Error>>, }, MarkChatAsChatted { - chat: JID, + chat: BareJID, result: oneshot::Sender<Result<(), Error>>, }, UpdateChatCorrespondent { old_chat: Chat, - new_correspondent: JID, + new_correspondent: BareJID, result: oneshot::Sender<Result<Chat, Error>>, }, DeleteChat { - chat: JID, + chat: BareJID, result: oneshot::Sender<Result<(), Error>>, }, ReadChats { @@ -652,18 +622,18 @@ pub enum DbCommand { // }, CreateMessage { message: Message, - chat: JID, - from: JID, + chat: BareJID, + from: FullJID, result: oneshot::Sender<Result<(), Error>>, }, UpsertChatAndUser { - chat: JID, + chat: BareJID, result: oneshot::Sender<Result<bool, Error>>, }, CreateMessageWithUserResource { message: Message, - chat: JID, - from: JID, + chat: BareJID, + from: FullJID, result: oneshot::Sender<Result<(), Error>>, }, UpdateMessageDelivery { @@ -680,11 +650,11 @@ pub enum DbCommand { result: oneshot::Sender<Result<Message, Error>>, }, ReadMessageHistory { - chat: JID, + chat: BareJID, result: oneshot::Sender<Result<Vec<Message>, Error>>, }, ReadMessageHistoryWithUsers { - chat: JID, + chat: BareJID, result: oneshot::Sender<Result<Vec<(Message, User)>, Error>>, }, ReadCachedStatus { @@ -758,17 +728,19 @@ impl Display for DbCommand { impl DbActor { /// must be run in blocking spawn #[cfg(not(target_arch = "wasm32"))] - pub(crate) fn new(path: impl AsRef<Path>, receiver: mpsc::Receiver<DbCommand>) -> Self { + pub(crate) fn new( + path: impl AsRef<Path>, + receiver: mpsc::UnboundedReceiver<DbCommand>, + ) -> Result<Self, DatabaseOpenError> { if let Some(dir) = path.as_ref().parent() { if dir.is_dir() { } else { - tokio::fs::create_dir_all(dir).await?; + std::fs::create_dir_all(dir)?; } - let _file = tokio::fs::OpenOptions::new() + let _file = std::fs::OpenOptions::new() .append(true) .create(true) - .open(path.as_ref()) - .await?; + .open(path.as_ref())?; } let url = format!( "{}", @@ -786,7 +758,9 @@ impl DbActor { /// must be run in blocking spawn #[cfg(not(target_arch = "wasm32"))] - pub(crate) fn new_memory(receiver: mpsc::Receiver<DbCommand>) -> Self { + pub(crate) fn new_memory( + receiver: mpsc::UnboundedReceiver<DbCommand>, + ) -> Result<Self, DatabaseOpenError> { let db = Connection::open_in_memory()?; db.execute_batch(include_str!("../migrations/1.sql"))?; Ok(Self { db, receiver }) @@ -850,7 +824,7 @@ impl DbActor { result.send(self.read_contact(contact)); } DbCommand::ReadContactOpt { contact, result } => { - result.send(self.read_contact_opt(&contact)); + result.send(self.read_contact_opt(contact)); } DbCommand::UpdateContact { contact, result } => { result.send(self.update_contact(contact)); @@ -978,7 +952,7 @@ impl DbActor { } // TODO: this is not a 'read' user - pub(crate) fn read_user(&self, user: JID) -> Result<User, Error> { + pub(crate) fn read_user(&self, user: BareJID) -> Result<User, Error> { let db = &self.db; let user_opt = db .query_row( @@ -1007,7 +981,7 @@ impl DbActor { } /// returns whether or not the nickname was updated - pub(crate) fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { + pub(crate) fn delete_user_nick(&self, jid: BareJID) -> Result<bool, Error> { let rows_affected; { rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, None::<String>, None::<String>, None::<String>))?; @@ -1020,7 +994,7 @@ impl DbActor { } /// returns whether or not the nickname was updated - pub(crate) fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { + pub(crate) fn upsert_user_nick(&self, jid: BareJID, nick: String) -> Result<bool, Error> { let rows_affected; { rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, &nick, &nick, &nick))?; @@ -1033,7 +1007,7 @@ impl DbActor { } /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar - pub(crate) fn delete_user_avatar(&self, jid: JID) -> Result<(bool, Option<String>), Error> { + pub(crate) fn delete_user_avatar(&self, jid: BareJID) -> Result<(bool, Option<String>), Error> { let (old_avatar, rows_affected): (Option<String>, _); { let db = &self.db; @@ -1054,7 +1028,7 @@ impl DbActor { /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar pub(crate) fn upsert_user_avatar( &self, - jid: JID, + jid: BareJID, avatar: String, ) -> Result<(bool, Option<String>), Error> { let (old_avatar, rows_affected): (Option<String>, _); @@ -1108,7 +1082,7 @@ impl DbActor { Ok(()) } - pub(crate) fn read_contact(&self, contact: JID) -> Result<Contact, Error> { + pub(crate) fn read_contact(&self, contact: BareJID) -> Result<Contact, Error> { let db = &self.db; let mut contact_item = db.query_row( "select user_jid, name, subscription from roster where user_jid = ?1", @@ -1130,7 +1104,7 @@ impl DbActor { Ok(contact_item) } - pub(crate) fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> { + pub(crate) fn read_contact_opt(&self, contact: BareJID) -> Result<Option<Contact>, Error> { let db = &self.db; let contact_item = db .query_row( @@ -1210,7 +1184,7 @@ impl DbActor { Ok(()) } - pub(crate) fn delete_contact(&self, contact: JID) -> Result<(), Error> { + pub(crate) fn delete_contact(&self, contact: BareJID) -> Result<(), Error> { self.db .execute("delete from roster where user_jid = ?1", [&contact])?; Ok(()) @@ -1288,19 +1262,21 @@ impl DbActor { // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster. - /// should be a bare jid /// TODO: this is NOT a read - pub(crate) fn read_chat(&self, chat: JID) -> Result<Chat, Error> { - let chat_opt = self.db.query_row( - "select correspondent, have_chatted from chats where correspondent = ?1", - [&chat], - |row| { - Ok(Chat { - correspondent: row.get(0)?, - have_chatted: row.get(1)?, - }) - }, - ).optional()?; + pub(crate) fn read_chat(&self, chat: BareJID) -> Result<Chat, Error> { + let chat_opt = self + .db + .query_row( + "select correspondent, have_chatted from chats where correspondent = ?1", + [&chat], + |row| { + Ok(Chat { + correspondent: row.get(0)?, + have_chatted: row.get(1)?, + }) + }, + ) + .optional()?; match chat_opt { Some(chat) => return Ok(chat), None => { @@ -1314,7 +1290,7 @@ impl DbActor { } } - pub(crate) fn read_chat_and_user(&self, chat: JID) -> Result<(Chat, User), Error> { + pub(crate) fn read_chat_and_user(&self, chat: BareJID) -> Result<(Chat, User), Error> { let user = self.read_user(chat.clone())?; let chat_opt = self.db.query_row( "select correspondent, have_chatted, jid, nick, avatar from chats join users on correspondent = jid where correspondent = ?1", @@ -1346,7 +1322,7 @@ impl DbActor { } } - pub(crate) fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { + pub(crate) fn mark_chat_as_chatted(&self, chat: BareJID) -> Result<(), Error> { self.db.execute( "update chats set have_chatted = true where correspondent = ?1", [chat], @@ -1357,7 +1333,7 @@ impl DbActor { pub(crate) fn update_chat_correspondent( &self, old_chat: Chat, - new_correspondent: JID, + new_correspondent: BareJID, ) -> Result<Chat, Error> { let new_jid = &new_correspondent; let old_jid = old_chat.correspondent(); @@ -1374,7 +1350,7 @@ impl DbActor { // pub(crate) fn update_chat - pub(crate) fn delete_chat(&self, chat: JID) -> Result<(), Error> { + pub(crate) fn delete_chat(&self, chat: BareJID) -> Result<(), Error> { self.db .execute("delete from chats where correspondent = ?1", [chat])?; Ok(()) @@ -1484,7 +1460,7 @@ impl DbActor { } #[tracing::instrument] - fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> { + fn read_chat_id(&self, chat: BareJID) -> Result<Uuid, Error> { let chat_id = self.db.query_row( "select id from chats where correspondent = ?1", [chat], @@ -1510,8 +1486,8 @@ impl DbActor { pub(crate) fn create_message( &self, message: Message, - chat: JID, - from: JID, + chat: BareJID, + from: FullJID, ) -> Result<(), Error> { let from_jid = from.as_bare(); let chat_id = self.read_chat_id(chat)?; @@ -1520,18 +1496,17 @@ impl DbActor { Ok(()) } - pub(crate) fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> { - let bare_chat = chat.as_bare(); + pub(crate) fn upsert_chat_and_user(&self, chat: &BareJID) -> Result<bool, Error> { let db = &self.db; db.execute( "insert into users (jid) values (?1) on conflict do nothing", - [&bare_chat], + [&chat], )?; let id = Uuid::new_v4(); - db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &bare_chat, false))?; + db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &chat, false))?; let chat = db.query_row( "select correspondent, have_chatted from chats where correspondent = ?1", - [&bare_chat], + [&chat], |row| { Ok(Chat { correspondent: row.get(0)?, @@ -1547,21 +1522,15 @@ impl DbActor { pub(crate) fn create_message_with_user_resource( &self, message: Message, - // TODO: enforce two kinds of jid. bare and full - // must be bare jid - chat: JID, - // full jid - from: JID, + chat: BareJID, + from: FullJID, ) -> Result<(), Error> { let from_jid = from.as_bare(); - let chat = chat.as_bare(); tracing::debug!("creating resource"); - if let Some(resource) = &from.resourcepart { - self.db.execute( - "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing", - (&from_jid, resource), - )?; - } + self.db.execute( + "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing", + (&from_jid, &from.resourcepart), + )?; self.create_message(message, chat, from)?; Ok(()) } @@ -1615,7 +1584,7 @@ impl DbActor { } // TODO: paging - pub(crate) fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> { + pub(crate) fn read_message_history(&self, chat: BareJID) -> Result<Vec<Message>, Error> { let chat_id = self.read_chat_id(chat)?; let messages = self .db @@ -1638,7 +1607,7 @@ impl DbActor { pub(crate) fn read_message_history_with_users( &self, - chat: JID, + chat: BareJID, ) -> Result<Vec<(Message, User)>, Error> { let chat_id = self.read_chat_id(chat)?; let messages = self |