aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/db.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/db.rs')
-rw-r--r--filamento/src/db.rs521
1 files changed, 521 insertions, 0 deletions
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
new file mode 100644
index 0000000..aea40ac
--- /dev/null
+++ b/filamento/src/db.rs
@@ -0,0 +1,521 @@
+use std::{collections::HashSet, path::Path};
+
+use jid::JID;
+use sqlx::{migrate, Error, SqlitePool};
+use uuid::Uuid;
+
+use crate::{
+ chat::{Chat, Message},
+ error::{DatabaseError, DatabaseOpenError},
+ presence::Online,
+ roster::Contact,
+ user::User,
+};
+
+#[derive(Clone)]
+pub struct Db {
+ db: SqlitePool,
+}
+
+// TODO: turn into trait
+impl Db {
+ pub async fn create_connect_and_migrate(
+ path: impl AsRef<Path>,
+ ) -> Result<Self, DatabaseOpenError> {
+ if let Some(dir) = path.as_ref().parent() {
+ if dir.is_dir() {
+ } else {
+ tokio::fs::create_dir_all(dir).await?;
+ }
+ let _file = tokio::fs::OpenOptions::new()
+ .append(true)
+ .create(true)
+ .open(path.as_ref())
+ .await?;
+ }
+ let url = format!(
+ "sqlite://{}",
+ path.as_ref()
+ .to_str()
+ .ok_or(DatabaseOpenError::InvalidPath)?
+ );
+ let db = SqlitePool::connect(&url).await?;
+ migrate!().run(&db).await?;
+ Ok(Self { db })
+ }
+
+ pub(crate) fn new(db: SqlitePool) -> Self {
+ Self { db }
+ }
+
+ pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into users ( jid, cached_status_message ) values ( ?, ? )",
+ user.jid,
+ user.cached_status_message
+ )
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
+ let user: User = sqlx::query_as("select * from users where jid = ?")
+ .bind(user)
+ .fetch_one(&self.db)
+ .await?;
+ Ok(user)
+ }
+
+ pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
+ sqlx::query!(
+ "update users set cached_status_message = ? where jid = ?",
+ user.cached_status_message,
+ user.jid
+ )
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
+ // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {}
+
+ /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
+ pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )",
+ contact.user_jid,
+ contact.name,
+ contact.subscription
+ )
+ .execute(&self.db)
+ .await?;
+ // TODO: abstract this out in to add_to_group() function ?
+ for group in contact.groups {
+ sqlx::query!(
+ "insert into groups (group_name) values (?) on conflict do nothing",
+ group
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into groups_roster (group_name, contact_jid) values (?, ?)",
+ group,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
+ let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?")
+ .bind(contact)
+ .fetch_one(&self.db)
+ .await?;
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ Ok(contact)
+ }
+
+ pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
+ let contact: Option<Contact> =
+ sqlx::query_as("select * from roster join users on jid = user_jid where jid = ?")
+ .bind(contact)
+ .fetch_optional(&self.db)
+ .await?;
+ if let Some(mut contact) = contact {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ Ok(Some(contact))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// does not update the underlying user, to update user, update_user() must be called separately
+ pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
+ sqlx::query!(
+ "update roster set name = ?, subscription = ? where user_jid = ?",
+ contact.name,
+ contact.subscription,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "delete from groups_roster where contact_jid = ?",
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ // TODO: delete orphaned groups from groups table
+ for group in contact.groups {
+ sqlx::query!(
+ "insert into groups (group_name) values (?) on conflict do nothing",
+ group
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into groups_roster (group_name, contact_jid) values (?, ?)",
+ group,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into users ( jid ) values ( ? ) on conflict do nothing",
+ contact.user_jid,
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?",
+ contact.user_jid,
+ contact.name,
+ contact.subscription,
+ contact.name,
+ contact.subscription
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "delete from groups_roster where contact_jid = ?",
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ // TODO: delete orphaned groups from groups table
+ for group in contact.groups {
+ sqlx::query!(
+ "insert into groups (group_name) values (?) on conflict do nothing",
+ group
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into groups_roster (group_name, contact_jid) values (?, ?)",
+ group,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
+ sqlx::query!("delete from roster where user_jid = ?", contact)
+ .execute(&self.db)
+ .await?;
+ // TODO: delete orphaned groups from groups table
+ Ok(())
+ }
+
+ pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
+ sqlx::query!("delete from roster").execute(&self.db).await?;
+ for contact in roster {
+ self.upsert_contact(contact).await?;
+ }
+ Ok(())
+ }
+
+ pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
+ let mut roster: Vec<Contact> =
+ sqlx::query_as("select * from roster join users on jid = user_jid")
+ .fetch_all(&self.db)
+ .await?;
+ for contact in &mut roster {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ }
+ Ok(roster)
+ }
+
+ pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
+ let id = Uuid::new_v4();
+ let jid = chat.correspondent();
+ sqlx::query!(
+ "insert into chats (id, correspondent) values (?, ?)",
+ id,
+ jid
+ )
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.
+
+ pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
+ // check if the chat correponding with the jid exists
+ let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?")
+ .bind(chat)
+ .fetch_one(&self.db)
+ .await?;
+ Ok(chat)
+ }
+
+ pub(crate) async fn update_chat_correspondent(
+ &self,
+ old_chat: Chat,
+ new_correspondent: JID,
+ ) -> Result<Chat, Error> {
+ // TODO: update other chat data if it differs (for now there is only correspondent so doesn't matter)
+ let new_jid = &new_correspondent;
+ let old_jid = old_chat.correspondent();
+ sqlx::query!(
+ "update chats set correspondent = ? where correspondent = ?",
+ new_jid,
+ old_jid,
+ )
+ .execute(&self.db)
+ .await?;
+ let chat = self.read_chat(new_correspondent).await?;
+ Ok(chat)
+ }
+
+ // pub(crate) async fn update_chat
+
+ pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
+ sqlx::query!("delete from chats where correspondent = ?", chat)
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ /// TODO: sorting and filtering (for now there is no sorting)
+ pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
+ let chats: Vec<Chat> = sqlx::query_as("select * from chats")
+ .fetch_all(&self.db)
+ .await?;
+ Ok(chats)
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
+ let chats = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ .fetch_all(&self.db)
+ .await?;
+ Ok(chats)
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered_with_latest_messages(
+ &self,
+ ) -> Result<Vec<(Chat, Message)>, Error> {
+ #[derive(sqlx::FromRow)]
+ pub struct ChatWithMessage {
+ #[sqlx(flatten)]
+ pub chat: Chat,
+ #[sqlx(flatten)]
+ pub message: Message,
+ }
+
+ // TODO: i don't know if this will assign the right uuid to the latest message or the chat's id. should probably check but i don't think it matters as nothing ever gets called with the id of the latest message in the chats list
+ let chats: Vec<ChatWithMessage> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ .fetch_all(&self.db)
+ .await?;
+
+ let chats = chats
+ .into_iter()
+ .map(|chat_with_message| (chat_with_message.chat, chat_with_message.message))
+ .collect();
+
+ Ok(chats)
+ }
+
+ async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ id: Uuid,
+ }
+ let chat = chat.as_bare();
+ let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?")
+ .bind(chat)
+ .fetch_one(&self.db)
+ .await?;
+ let chat_id = chat_id.id;
+ Ok(chat_id)
+ }
+
+ async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ id: Uuid,
+ }
+ let chat_id: Option<Row> = sqlx::query_as("select id from chats where correspondent = ?")
+ .bind(chat)
+ .fetch_optional(&self.db)
+ .await?;
+ let chat_id = chat_id.map(|row| row.id);
+ Ok(chat_id)
+ }
+
+ /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
+ pub(crate) async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {
+ // TODO: one query
+ let bare_jid = message.from.as_bare();
+ let resource = message.from.resourcepart;
+ let chat_id = self.read_chat_id(chat).await?;
+ sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource, message.timestamp).execute(&self.db).await?;
+ Ok(())
+ }
+
+ pub(crate) async fn create_message_with_self_resource_and_chat(
+ &self,
+ message: Message,
+ chat: JID,
+ ) -> Result<(), Error> {
+ let from_jid = message.from.as_bare();
+ let resource = &message.from.resourcepart;
+ let bare_chat = chat.as_bare();
+ sqlx::query!(
+ "insert into users (jid) values (?) on conflict do nothing",
+ from_jid
+ )
+ .execute(&self.db)
+ .await?;
+ let id = Uuid::new_v4();
+ sqlx::query!(
+ "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
+ id,
+ bare_chat
+ )
+ .execute(&self.db)
+ .await?;
+ if let Some(resource) = resource {
+ sqlx::query!(
+ "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
+ from_jid,
+ resource
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ self.create_message(message, chat).await?;
+ Ok(())
+ }
+
+ // create direct message from incoming
+ pub(crate) async fn create_message_with_user_resource_and_chat(
+ &self,
+ message: Message,
+ chat: JID,
+ ) -> Result<(), Error> {
+ let bare_chat = chat.as_bare();
+ let resource = &chat.resourcepart;
+ sqlx::query!(
+ "insert into users (jid) values (?) on conflict do nothing",
+ bare_chat
+ )
+ .execute(&self.db)
+ .await?;
+ let id = Uuid::new_v4();
+ sqlx::query!(
+ "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
+ id,
+ bare_chat
+ )
+ .execute(&self.db)
+ .await?;
+ if let Some(resource) = resource {
+ sqlx::query!(
+ "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
+ bare_chat,
+ resource
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ self.create_message(message, chat).await?;
+ Ok(())
+ }
+
+ pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
+ let message: Message = sqlx::query_as("select * from messages where id = ?")
+ .bind(message)
+ .fetch_one(&self.db)
+ .await?;
+ Ok(message)
+ }
+
+ // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}
+
+ pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> {
+ sqlx::query!("delete from messages where id = ?", message)
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
+ // TODO: paging
+ pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
+ let chat_id = self.read_chat_id(chat).await?;
+ let messages: Vec<Message> =
+ sqlx::query_as("select * from messages where chat_id = ? order by timestamp asc")
+ .bind(chat_id)
+ .fetch_all(&self.db)
+ .await?;
+ Ok(messages)
+ }
+
+ pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
+ let online: Online = sqlx::query_as("select * from cached_status where id = 0")
+ .fetch_one(&self.db)
+ .await?;
+ Ok(online)
+ }
+
+ pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?",
+ status.show,
+ status.status,
+ status.show,
+ status.status
+ ).execute(&self.db).await?;
+ Ok(())
+ }
+
+ pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> {
+ sqlx::query!("update cached_status set show = null, message = null where id = 0")
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+}