aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/db.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/db.rs')
-rw-r--r--filamento/src/db.rs247
1 files changed, 108 insertions, 139 deletions
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
index 1b5afe6..385382a 100644
--- a/filamento/src/db.rs
+++ b/filamento/src/db.rs
@@ -2,7 +2,7 @@ use core::fmt::Display;
use std::{collections::HashSet, ops::Deref, path::Path, sync::Arc};
use chrono::{DateTime, Utc};
-use jid::JID;
+use jid::{BareJID, FullJID, JID};
use rusqlite::{Connection, OptionalExtension};
use tokio::sync::{Mutex, MutexGuard};
use tokio::sync::{mpsc, oneshot};
@@ -58,53 +58,22 @@ impl Db {
pub async fn create_connect_and_migrate(
path: impl AsRef<Path> + Send,
) -> Result<Self, DatabaseOpenError> {
- let (sender, receiver) = mpsc::channel(20);
- let (result_send, result_recv) = oneshot::channel();
- spawn_blocking(move || {
- let result = DbActor::new(path, receiver).await;
- match result {
- Ok(a) => {
- result_send.send(Ok(()));
- a.run()
- }
- Err(e) => {
- result_send.send(Err(e));
- }
- }
- });
- match result_recv.await {
- Ok(r) => match r {
- Ok(o) => Ok(Self { sender }),
- Err(e) => return Err(e),
- },
- Err(e) => return Err(e.into()),
- }
+ let (sender, receiver) = mpsc::unbounded_channel();
+
+ let actor = DbActor::new(path, receiver)?;
+ spawn_blocking(move || actor.run());
+
+ Ok(Self { sender })
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> {
- let (sender, receiver) = mpsc::channel(20);
- let (result_send, result_recv) = oneshot::channel();
- spawn_blocking(move || {
- let result = DbActor::new_memory(receiver).await;
- match result {
- Ok(a) => {
- result_send.send(Ok(()));
- // TODO: async run when not wasm
- a.run()
- }
- Err(e) => {
- result_send.send(Err(e));
- }
- }
- });
- match result_recv.await {
- Ok(r) => match r {
- Ok(o) => Ok(Self { sender }),
- Err(e) => return Err(e),
- },
- Err(e) => return Err(e.into()),
- }
+ let (sender, receiver) = mpsc::unbounded_channel();
+
+ let actor = DbActor::new_memory(receiver)?;
+ spawn_blocking(move || actor.run());
+
+ Ok(Self { sender })
}
/// `file_name` should be a file not in a directory
@@ -183,7 +152,7 @@ impl Db {
}
// TODO: this is not a 'read' user
- pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
+ pub(crate) async fn read_user(&self, user: BareJID) -> Result<User, Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::ReadUser { user, result };
self.sender.send(command);
@@ -192,7 +161,7 @@ impl Db {
}
/// returns whether or not the nickname was updated
- pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> {
+ pub(crate) async fn delete_user_nick(&self, jid: BareJID) -> Result<bool, Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::DeleteUserNick { jid, result };
self.sender.send(command);
@@ -201,7 +170,7 @@ impl Db {
}
/// returns whether or not the nickname was updated
- pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> {
+ pub(crate) async fn upsert_user_nick(&self, jid: BareJID, nick: String) -> Result<bool, Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::UpsertUserNick { jid, nick, result };
self.sender.send(command);
@@ -212,7 +181,7 @@ impl Db {
/// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
pub(crate) async fn delete_user_avatar(
&self,
- jid: JID,
+ jid: BareJID,
) -> Result<(bool, Option<String>), Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::DeleteUserAvatar { jid, result };
@@ -224,7 +193,7 @@ impl Db {
/// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
pub(crate) async fn upsert_user_avatar(
&self,
- jid: JID,
+ jid: BareJID,
avatar: String,
) -> Result<(bool, Option<String>), Error> {
let (result, recv) = oneshot::channel();
@@ -259,7 +228,7 @@ impl Db {
result
}
- pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
+ pub(crate) async fn read_contact(&self, contact: BareJID) -> Result<Contact, Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::ReadContact { contact, result };
self.sender.send(command);
@@ -267,7 +236,10 @@ impl Db {
result
}
- pub(crate) async fn read_contact_opt(&self, contact: JID) -> Result<Option<Contact>, Error> {
+ pub(crate) async fn read_contact_opt(
+ &self,
+ contact: BareJID,
+ ) -> Result<Option<Contact>, Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::ReadContactOpt { contact, result };
self.sender.send(command);
@@ -292,7 +264,7 @@ impl Db {
result
}
- pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
+ pub(crate) async fn delete_contact(&self, contact: BareJID) -> Result<(), Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::DeleteContact { contact, result };
self.sender.send(command);
@@ -336,7 +308,7 @@ impl Db {
// TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.
- pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
+ pub(crate) async fn read_chat(&self, chat: BareJID) -> Result<Chat, Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::ReadChat { chat, result };
self.sender.send(command);
@@ -344,7 +316,7 @@ impl Db {
result
}
- pub(crate) async fn read_chat_and_user(&self, chat: JID) -> Result<(Chat, User), Error> {
+ pub(crate) async fn read_chat_and_user(&self, chat: BareJID) -> Result<(Chat, User), Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::ReadChatAndUser { chat, result };
self.sender.send(command);
@@ -352,7 +324,7 @@ impl Db {
result
}
- pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> {
+ pub(crate) async fn mark_chat_as_chatted(&self, chat: BareJID) -> Result<(), Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::MarkChatAsChatted { chat, result };
self.sender.send(command);
@@ -363,7 +335,7 @@ impl Db {
pub(crate) async fn update_chat_correspondent(
&self,
old_chat: Chat,
- new_correspondent: JID,
+ new_correspondent: BareJID,
) -> Result<Chat, Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::UpdateChatCorrespondent {
@@ -378,7 +350,7 @@ impl Db {
// pub(crate) async fn update_chat
- pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
+ pub(crate) async fn delete_chat(&self, chat: BareJID) -> Result<(), Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::DeleteChat { chat, result };
self.sender.send(command);
@@ -434,8 +406,8 @@ impl Db {
pub(crate) async fn create_message(
&self,
message: Message,
- chat: JID,
- from: JID,
+ chat: BareJID,
+ from: FullJID,
) -> Result<(), Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::CreateMessage {
@@ -449,7 +421,7 @@ impl Db {
result
}
- pub(crate) async fn upsert_chat_and_user(&self, chat: JID) -> Result<bool, Error> {
+ pub(crate) async fn upsert_chat_and_user(&self, chat: BareJID) -> Result<bool, Error> {
let (result, recv) = oneshot::channel();
let command = DbCommand::UpsertChatAndUser { chat, result };
self.sender.send(command);
@@ -463,10 +435,8 @@ impl Db {
&self,
message: Message,
// TODO: enforce two kinds of jid. bare and full
- // must be bare jid
- chat: JID,
- // full jid
- from: JID,
+ chat: BareJID,
+ from: FullJID,
) -> Result<(), Error> {
tracing::info!("MSGDEBUG create_message_with_user_resource exists");
let (result, recv) = oneshot::channel();
@@ -514,9 +484,9 @@ impl Db {
DeleteCachedStatus => delete_cached_status() -> ();
UpsertCachedStatus => upsert_cached_status(status: Online) -> ();
ReadCachedStatus => read_cached_status() -> Online;
- ReadMessageHistoryWithUsers => read_message_history_with_users(chat: JID) -> Vec<(Message, User)>;
+ ReadMessageHistoryWithUsers => read_message_history_with_users(chat: BareJID) -> Vec<(Message, User)>;
// TODO: paging
- ReadMessageHistory => read_message_history(chat: JID) -> Vec<Message>;
+ ReadMessageHistory => read_message_history(chat: BareJID) -> Vec<Message>;
ReadMessage => read_message(message: Uuid) -> Message;
DeleteMessage => delete_message(message: Uuid) -> ()
);
@@ -546,24 +516,24 @@ pub enum DbCommand {
result: oneshot::Sender<Result<(), Error>>,
},
ReadUser {
- user: JID,
+ user: BareJID,
result: oneshot::Sender<Result<User, Error>>,
},
DeleteUserNick {
- jid: JID,
+ jid: BareJID,
result: oneshot::Sender<Result<bool, Error>>,
},
UpsertUserNick {
- jid: JID,
+ jid: BareJID,
nick: String,
result: oneshot::Sender<Result<bool, Error>>,
},
DeleteUserAvatar {
- jid: JID,
+ jid: BareJID,
result: oneshot::Sender<Result<(bool, Option<String>), Error>>,
},
UpsertUserAvatar {
- jid: JID,
+ jid: BareJID,
avatar: String,
result: oneshot::Sender<Result<(bool, Option<String>), Error>>,
},
@@ -576,11 +546,11 @@ pub enum DbCommand {
result: oneshot::Sender<Result<(), Error>>,
},
ReadContact {
- contact: JID,
+ contact: BareJID,
result: oneshot::Sender<Result<Contact, Error>>,
},
ReadContactOpt {
- contact: JID,
+ contact: BareJID,
result: oneshot::Sender<Result<Option<Contact>, Error>>,
},
UpdateContact {
@@ -592,7 +562,7 @@ pub enum DbCommand {
result: oneshot::Sender<Result<(), Error>>,
},
DeleteContact {
- contact: JID,
+ contact: BareJID,
result: oneshot::Sender<Result<(), Error>>,
},
ReplaceCachedRoster {
@@ -610,24 +580,24 @@ pub enum DbCommand {
result: oneshot::Sender<Result<(), Error>>,
},
ReadChat {
- chat: JID,
+ chat: BareJID,
result: oneshot::Sender<Result<Chat, Error>>,
},
ReadChatAndUser {
- chat: JID,
+ chat: BareJID,
result: oneshot::Sender<Result<(Chat, User), Error>>,
},
MarkChatAsChatted {
- chat: JID,
+ chat: BareJID,
result: oneshot::Sender<Result<(), Error>>,
},
UpdateChatCorrespondent {
old_chat: Chat,
- new_correspondent: JID,
+ new_correspondent: BareJID,
result: oneshot::Sender<Result<Chat, Error>>,
},
DeleteChat {
- chat: JID,
+ chat: BareJID,
result: oneshot::Sender<Result<(), Error>>,
},
ReadChats {
@@ -652,18 +622,18 @@ pub enum DbCommand {
// },
CreateMessage {
message: Message,
- chat: JID,
- from: JID,
+ chat: BareJID,
+ from: FullJID,
result: oneshot::Sender<Result<(), Error>>,
},
UpsertChatAndUser {
- chat: JID,
+ chat: BareJID,
result: oneshot::Sender<Result<bool, Error>>,
},
CreateMessageWithUserResource {
message: Message,
- chat: JID,
- from: JID,
+ chat: BareJID,
+ from: FullJID,
result: oneshot::Sender<Result<(), Error>>,
},
UpdateMessageDelivery {
@@ -680,11 +650,11 @@ pub enum DbCommand {
result: oneshot::Sender<Result<Message, Error>>,
},
ReadMessageHistory {
- chat: JID,
+ chat: BareJID,
result: oneshot::Sender<Result<Vec<Message>, Error>>,
},
ReadMessageHistoryWithUsers {
- chat: JID,
+ chat: BareJID,
result: oneshot::Sender<Result<Vec<(Message, User)>, Error>>,
},
ReadCachedStatus {
@@ -758,17 +728,19 @@ impl Display for DbCommand {
impl DbActor {
/// must be run in blocking spawn
#[cfg(not(target_arch = "wasm32"))]
- pub(crate) fn new(path: impl AsRef<Path>, receiver: mpsc::Receiver<DbCommand>) -> Self {
+ pub(crate) fn new(
+ path: impl AsRef<Path>,
+ receiver: mpsc::UnboundedReceiver<DbCommand>,
+ ) -> Result<Self, DatabaseOpenError> {
if let Some(dir) = path.as_ref().parent() {
if dir.is_dir() {
} else {
- tokio::fs::create_dir_all(dir).await?;
+ std::fs::create_dir_all(dir)?;
}
- let _file = tokio::fs::OpenOptions::new()
+ let _file = std::fs::OpenOptions::new()
.append(true)
.create(true)
- .open(path.as_ref())
- .await?;
+ .open(path.as_ref())?;
}
let url = format!(
"{}",
@@ -786,7 +758,9 @@ impl DbActor {
/// must be run in blocking spawn
#[cfg(not(target_arch = "wasm32"))]
- pub(crate) fn new_memory(receiver: mpsc::Receiver<DbCommand>) -> Self {
+ pub(crate) fn new_memory(
+ receiver: mpsc::UnboundedReceiver<DbCommand>,
+ ) -> Result<Self, DatabaseOpenError> {
let db = Connection::open_in_memory()?;
db.execute_batch(include_str!("../migrations/1.sql"))?;
Ok(Self { db, receiver })
@@ -850,7 +824,7 @@ impl DbActor {
result.send(self.read_contact(contact));
}
DbCommand::ReadContactOpt { contact, result } => {
- result.send(self.read_contact_opt(&contact));
+ result.send(self.read_contact_opt(contact));
}
DbCommand::UpdateContact { contact, result } => {
result.send(self.update_contact(contact));
@@ -978,7 +952,7 @@ impl DbActor {
}
// TODO: this is not a 'read' user
- pub(crate) fn read_user(&self, user: JID) -> Result<User, Error> {
+ pub(crate) fn read_user(&self, user: BareJID) -> Result<User, Error> {
let db = &self.db;
let user_opt = db
.query_row(
@@ -1007,7 +981,7 @@ impl DbActor {
}
/// returns whether or not the nickname was updated
- pub(crate) fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> {
+ pub(crate) fn delete_user_nick(&self, jid: BareJID) -> Result<bool, Error> {
let rows_affected;
{
rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, None::<String>, None::<String>, None::<String>))?;
@@ -1020,7 +994,7 @@ impl DbActor {
}
/// returns whether or not the nickname was updated
- pub(crate) fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> {
+ pub(crate) fn upsert_user_nick(&self, jid: BareJID, nick: String) -> Result<bool, Error> {
let rows_affected;
{
rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, &nick, &nick, &nick))?;
@@ -1033,7 +1007,7 @@ impl DbActor {
}
/// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
- pub(crate) fn delete_user_avatar(&self, jid: JID) -> Result<(bool, Option<String>), Error> {
+ pub(crate) fn delete_user_avatar(&self, jid: BareJID) -> Result<(bool, Option<String>), Error> {
let (old_avatar, rows_affected): (Option<String>, _);
{
let db = &self.db;
@@ -1054,7 +1028,7 @@ impl DbActor {
/// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
pub(crate) fn upsert_user_avatar(
&self,
- jid: JID,
+ jid: BareJID,
avatar: String,
) -> Result<(bool, Option<String>), Error> {
let (old_avatar, rows_affected): (Option<String>, _);
@@ -1108,7 +1082,7 @@ impl DbActor {
Ok(())
}
- pub(crate) fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
+ pub(crate) fn read_contact(&self, contact: BareJID) -> Result<Contact, Error> {
let db = &self.db;
let mut contact_item = db.query_row(
"select user_jid, name, subscription from roster where user_jid = ?1",
@@ -1130,7 +1104,7 @@ impl DbActor {
Ok(contact_item)
}
- pub(crate) fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
+ pub(crate) fn read_contact_opt(&self, contact: BareJID) -> Result<Option<Contact>, Error> {
let db = &self.db;
let contact_item = db
.query_row(
@@ -1210,7 +1184,7 @@ impl DbActor {
Ok(())
}
- pub(crate) fn delete_contact(&self, contact: JID) -> Result<(), Error> {
+ pub(crate) fn delete_contact(&self, contact: BareJID) -> Result<(), Error> {
self.db
.execute("delete from roster where user_jid = ?1", [&contact])?;
Ok(())
@@ -1288,19 +1262,21 @@ impl DbActor {
// TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.
- /// should be a bare jid
/// TODO: this is NOT a read
- pub(crate) fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
- let chat_opt = self.db.query_row(
- "select correspondent, have_chatted from chats where correspondent = ?1",
- [&chat],
- |row| {
- Ok(Chat {
- correspondent: row.get(0)?,
- have_chatted: row.get(1)?,
- })
- },
- ).optional()?;
+ pub(crate) fn read_chat(&self, chat: BareJID) -> Result<Chat, Error> {
+ let chat_opt = self
+ .db
+ .query_row(
+ "select correspondent, have_chatted from chats where correspondent = ?1",
+ [&chat],
+ |row| {
+ Ok(Chat {
+ correspondent: row.get(0)?,
+ have_chatted: row.get(1)?,
+ })
+ },
+ )
+ .optional()?;
match chat_opt {
Some(chat) => return Ok(chat),
None => {
@@ -1314,7 +1290,7 @@ impl DbActor {
}
}
- pub(crate) fn read_chat_and_user(&self, chat: JID) -> Result<(Chat, User), Error> {
+ pub(crate) fn read_chat_and_user(&self, chat: BareJID) -> Result<(Chat, User), Error> {
let user = self.read_user(chat.clone())?;
let chat_opt = self.db.query_row(
"select correspondent, have_chatted, jid, nick, avatar from chats join users on correspondent = jid where correspondent = ?1",
@@ -1346,7 +1322,7 @@ impl DbActor {
}
}
- pub(crate) fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> {
+ pub(crate) fn mark_chat_as_chatted(&self, chat: BareJID) -> Result<(), Error> {
self.db.execute(
"update chats set have_chatted = true where correspondent = ?1",
[chat],
@@ -1357,7 +1333,7 @@ impl DbActor {
pub(crate) fn update_chat_correspondent(
&self,
old_chat: Chat,
- new_correspondent: JID,
+ new_correspondent: BareJID,
) -> Result<Chat, Error> {
let new_jid = &new_correspondent;
let old_jid = old_chat.correspondent();
@@ -1374,7 +1350,7 @@ impl DbActor {
// pub(crate) fn update_chat
- pub(crate) fn delete_chat(&self, chat: JID) -> Result<(), Error> {
+ pub(crate) fn delete_chat(&self, chat: BareJID) -> Result<(), Error> {
self.db
.execute("delete from chats where correspondent = ?1", [chat])?;
Ok(())
@@ -1484,7 +1460,7 @@ impl DbActor {
}
#[tracing::instrument]
- fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {
+ fn read_chat_id(&self, chat: BareJID) -> Result<Uuid, Error> {
let chat_id = self.db.query_row(
"select id from chats where correspondent = ?1",
[chat],
@@ -1510,8 +1486,8 @@ impl DbActor {
pub(crate) fn create_message(
&self,
message: Message,
- chat: JID,
- from: JID,
+ chat: BareJID,
+ from: FullJID,
) -> Result<(), Error> {
let from_jid = from.as_bare();
let chat_id = self.read_chat_id(chat)?;
@@ -1520,18 +1496,17 @@ impl DbActor {
Ok(())
}
- pub(crate) fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> {
- let bare_chat = chat.as_bare();
+ pub(crate) fn upsert_chat_and_user(&self, chat: &BareJID) -> Result<bool, Error> {
let db = &self.db;
db.execute(
"insert into users (jid) values (?1) on conflict do nothing",
- [&bare_chat],
+ [&chat],
)?;
let id = Uuid::new_v4();
- db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &bare_chat, false))?;
+ db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &chat, false))?;
let chat = db.query_row(
"select correspondent, have_chatted from chats where correspondent = ?1",
- [&bare_chat],
+ [&chat],
|row| {
Ok(Chat {
correspondent: row.get(0)?,
@@ -1547,21 +1522,15 @@ impl DbActor {
pub(crate) fn create_message_with_user_resource(
&self,
message: Message,
- // TODO: enforce two kinds of jid. bare and full
- // must be bare jid
- chat: JID,
- // full jid
- from: JID,
+ chat: BareJID,
+ from: FullJID,
) -> Result<(), Error> {
let from_jid = from.as_bare();
- let chat = chat.as_bare();
tracing::debug!("creating resource");
- if let Some(resource) = &from.resourcepart {
- self.db.execute(
- "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing",
- (&from_jid, resource),
- )?;
- }
+ self.db.execute(
+ "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing",
+ (&from_jid, &from.resourcepart),
+ )?;
self.create_message(message, chat, from)?;
Ok(())
}
@@ -1615,7 +1584,7 @@ impl DbActor {
}
// TODO: paging
- pub(crate) fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
+ pub(crate) fn read_message_history(&self, chat: BareJID) -> Result<Vec<Message>, Error> {
let chat_id = self.read_chat_id(chat)?;
let messages = self
.db
@@ -1638,7 +1607,7 @@ impl DbActor {
pub(crate) fn read_message_history_with_users(
&self,
- chat: JID,
+ chat: BareJID,
) -> Result<Vec<(Message, User)>, Error> {
let chat_id = self.read_chat_id(chat)?;
let messages = self