diff options
| author | 2025-04-17 17:30:22 +0100 | |
|---|---|---|
| committer | 2025-04-17 17:30:22 +0100 | |
| commit | 61b755c890dcaa66daa35942ca87cc00269b0fe9 (patch) | |
| tree | 36190a844cfef474d5e11fe473db462bb4a4d135 /filamento | |
| parent | 26d0ee51e232b793bc83ba565c0e9ab820d8d0db (diff) | |
| download | luz-61b755c890dcaa66daa35942ca87cc00269b0fe9.tar.gz luz-61b755c890dcaa66daa35942ca87cc00269b0fe9.tar.bz2 luz-61b755c890dcaa66daa35942ca87cc00269b0fe9.zip | |
feat(filamento): full wasm support by switching to rusqlite
Diffstat (limited to 'filamento')
| -rw-r--r-- | filamento/Cargo.toml | 5 | ||||
| -rw-r--r-- | filamento/migrations/1.sql (renamed from filamento/migrations/20240113011930_luz.sql) | 34 | ||||
| -rw-r--r-- | filamento/src/chat.rs | 34 | ||||
| -rw-r--r-- | filamento/src/db.rs | 570 | ||||
| -rw-r--r-- | filamento/src/logic/offline.rs | 2 | ||||
| -rw-r--r-- | filamento/src/logic/online.rs | 2 | ||||
| -rw-r--r-- | filamento/src/presence.rs | 28 | ||||
| -rw-r--r-- | filamento/src/roster.rs | 44 | 
8 files changed, 629 insertions, 90 deletions
| diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml index 66cbc31..1519f58 100644 --- a/filamento/Cargo.toml +++ b/filamento/Cargo.toml @@ -10,9 +10,9 @@ tokio = { workspace = true }  thiserror = "2.0.11"  stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115", "xep_0084"] }  # TODO: re-export jid? -jid = { version = "0.1.0", path = "../jid" } +jid = { version = "0.1.0", path = "../jid", features = ["rusqlite"] }  uuid = { version = "1.13.1", features = ["v4"] } -rusqlite = { git = "https://github.com/Spxg/rusqlite.git" } +rusqlite = { git = "https://github.com/Spxg/rusqlite.git", branch = "wasm-demo", features = ["uuid", "chrono"] }  tracing = "0.1.41"  chrono = "0.4.40"  sha2 = "0.10.8" @@ -26,6 +26,7 @@ hex = "0.4.3"  tokio = { workspace = true, features = ["sync", "time", "rt", "fs"] }  [target.'cfg(target_arch = "wasm32")'.dependencies] +rusqlite = { git = "https://github.com/Spxg/rusqlite.git", branch = "wasm-demo", features = ["uuid", "chrono", "precompiled-wasm"] }  tokio = { workspace = true, features = ["sync", "time", "rt"] }  tokio_with_wasm = { version = "0.8.2", features = ["sync", "time", "rt"] } diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/1.sql index c2f35dd..502c5a9 100644 --- a/filamento/migrations/20240113011930_luz.sql +++ b/filamento/migrations/1.sql @@ -2,7 +2,7 @@ PRAGMA foreign_keys = on;  -- a user jid will never change, only a chat user will change  -- TODO: avatar, nick, etc. -create table users( +create table if not exists users(      -- TODO: enforce bare jid      jid text primary key not null,      nick text, @@ -29,7 +29,7 @@ create table users(  --     primary key(activated timestamp, id, jid)  -- ); -create table resources( +create table if not exists resources(      bare_jid text not null,      resource text not null,      foreign key(bare_jid) references users(jid), @@ -37,14 +37,14 @@ create table resources(  );  -- enum for subscription state -create table subscription( +create table if not exists subscription(      state text primary key not null  ); -insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy'); +insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy') on conflict do nothing;  -- a roster contains users, with client-set nickname -CREATE TABLE roster(  +CREATE TABLE if not exists roster(       user_jid text primary key not null,      name TEXT,      subscription text not null, @@ -52,11 +52,11 @@ CREATE TABLE roster(      foreign key(user_jid) references users(jid)  ); -create table groups( +create table if not exists groups(      group_name text primary key not null  ); -create table groups_roster( +create table if not exists groups_roster(      group_name text not null,      contact_jid text not null,      foreign key(group_name) references groups(group_name), @@ -67,7 +67,7 @@ create table groups_roster(  -- chat includes reference to user jid chat is with  -- specifically for dms, groups should be different  -- can send chat message to user (creating a new chat if not already exists) -create table chats ( +create table if not exists chats (      id text primary key not null,      have_chatted bool not null,      correspondent text not null unique, @@ -75,14 +75,14 @@ create table chats (  );  -- enum for subscription state -create table delivery( +create table if not exists delivery(      state text primary key not null  ); -insert into delivery ( state ) values ('sending'), ('written'), ('sent'), ('delivered'), ('read'), ('failed'), ('queued'); +insert into delivery ( state ) values ('sending'), ('written'), ('sent'), ('delivered'), ('read'), ('failed'), ('queued') on conflict do nothing;  -- messages include reference to chat they are in, and who sent them. -create table messages ( +create table if not exists messages (      id text primary key not null,      body text,      -- delivery is nullable as only messages sent by the user are markable @@ -116,26 +116,26 @@ create table messages (  );  -- enum for subscription state -create table show ( +create table if not exists show (      state text primary key not null  ); -insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away'); +insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away') on conflict do nothing; -create table cached_status ( +create table if not exists cached_status (      id integer primary key not null,      show text,      message text,      foreign key(show) references show(state)  ); -insert into cached_status (id) values (0); +insert into cached_status (id) values (0) on conflict do nothing; -create table capability_hash_nodes ( +create table if not exists capability_hash_nodes (      node text primary key not null,      timestamp text,      -- TODO: normalization      capabilities text not null  ); -insert into capability_hash_nodes ( node, capabilities ) values ('https://bunny.garden/filamento#mSavc/SLnHm8zazs5RlcbD/iXoc=', 'aHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvY2Fwcx9odHRwOi8vamFiYmVyLm9yZy9wcm90b2NvbC9kaXNjbyNpbmZvH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL2Rpc2NvI2l0ZW1zH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL25pY2sfaHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvbmljaytub3RpZnkfHGNsaWVudB9wYx8fZmlsYW1lbnRvIDAuMS4wHx4cHA=='); +insert into capability_hash_nodes ( node, capabilities ) values ('https://bunny.garden/filamento#mSavc/SLnHm8zazs5RlcbD/iXoc=', 'aHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvY2Fwcx9odHRwOi8vamFiYmVyLm9yZy9wcm90b2NvbC9kaXNjbyNpbmZvH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL2Rpc2NvI2l0ZW1zH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL25pY2sfaHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvbmljaytub3RpZnkfHGNsaWVudB9wYx8fZmlsYW1lbnRvIDAuMS4wHx4cHA==') on conflict do nothing; diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs index 557b42b..2aa2282 100644 --- a/filamento/src/chat.rs +++ b/filamento/src/chat.rs @@ -1,5 +1,9 @@  use chrono::{DateTime, Utc};  use jid::JID; +use rusqlite::{ +    ToSql, +    types::{FromSql, ToSqlOutput, Value}, +};  use uuid::Uuid;  #[derive(Debug, Clone)] @@ -27,6 +31,36 @@ pub enum Delivery {      Queued,  } +impl ToSql for Delivery { +    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { +        Ok(match self { +            Delivery::Sending => ToSqlOutput::Owned(Value::Text("sending".to_string())), +            Delivery::Written => ToSqlOutput::Owned(Value::Text("written".to_string())), +            Delivery::Sent => ToSqlOutput::Owned(Value::Text("sent".to_string())), +            Delivery::Delivered => ToSqlOutput::Owned(Value::Text("delivered".to_string())), +            Delivery::Read => ToSqlOutput::Owned(Value::Text("read".to_string())), +            Delivery::Failed => ToSqlOutput::Owned(Value::Text("failed".to_string())), +            Delivery::Queued => ToSqlOutput::Owned(Value::Text("queued".to_string())), +        }) +    } +} + +impl FromSql for Delivery { +    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> { +        Ok(match value.as_str()? { +            "sending" => Self::Sending, +            "written" => Self::Written, +            "sent" => Self::Sent, +            "delivered" => Self::Delivered, +            "read" => Self::Read, +            "failed" => Self::Failed, +            "queued" => Self::Queued, +            // TODO: don't have these lol +            value => panic!("unexpected subscription `{value}`"), +        }) +    } +} +  // TODO: user migrations  // pub enum Migrated {  //     Jabber(User), diff --git a/filamento/src/db.rs b/filamento/src/db.rs index 467030d..ce0b35a 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -1,7 +1,9 @@ -use std::{collections::HashSet, path::Path}; +use std::{collections::HashSet, path::Path, sync::Arc};  use chrono::{DateTime, Utc};  use jid::JID; +use rusqlite::{Connection, OptionalExtension}; +use tokio::sync::{Mutex, MutexGuard};  use uuid::Uuid;  use crate::{ @@ -14,7 +16,7 @@ use crate::{  #[derive(Clone)]  pub struct Db { -    // db: SqlitePool, +    db: Arc<Mutex<rusqlite::Connection>>,  }  // TODO: turn into trait @@ -23,6 +25,8 @@ impl Db {      pub async fn create_connect_and_migrate(          path: impl AsRef<Path>,      ) -> Result<Self, DatabaseOpenError> { +        use rusqlite::Connection; +          if let Some(dir) = path.as_ref().parent() {              if dir.is_dir() {              } else { @@ -43,44 +47,95 @@ impl Db {          // let db = SqlitePool::connect(&url).await?;          // migrate!().run(&db).await?;          // Ok(Self { db }) -        Ok(Self {}) +        let db = Connection::open(url)?; +        db.execute_batch(include_str!("../migrations/1.sql"))?; +        Ok(Self { +            db: Arc::new(Mutex::new(db)), +        })      }      #[cfg(target_arch = "wasm32")]      pub async fn create_connect_and_migrate(          path: impl AsRef<Path>,      ) -> Result<Self, DatabaseOpenError> { -        // let url = "mem.db"; -        // let db = SqlitePool::connect(&url).await?; -        // // migrate!().run(&db).await?; -        Ok(Self {}) +        let db = Connection::open(path)?; +        db.execute_batch(include_str!("../migrations/1.sql"))?; +        Ok(Self { +            db: Arc::new(Mutex::new(db)), +        })      }      // pub(crate) fn new(db: SqlitePool) -> Self {      //     // Self { db }      //     Self {}      // } +    // +    pub async fn db(&self) -> MutexGuard<'_, Connection> { +        self.db.lock().await +    }      pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> { +        { +            self.db().await.execute( +                "insert into users ( jid, nick, avatar ) values ( ?1, ?2, ?3 )", +                (user.jid, user.nick, user.avatar), +            )?; +        }          Ok(())      }      pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { -        Ok(User { -            jid: user, -            nick: None, -            avatar: None, -        }) +        let db = self.db().await; +        let user_opt = db +            .query_row( +                "select jid, nick, avatar from users where jid = ?1", +                [&user], +                |row| { +                    Ok(User { +                        jid: row.get(0)?, +                        nick: row.get(1)?, +                        avatar: row.get(2)?, +                    }) +                }, +            ) +            .optional()?; +        match user_opt { +            Some(user) => Ok(user), +            None => { +                db.execute("insert into users ( jid ) values ( ?1 )", [&user])?; +                Ok(User { +                    jid: user, +                    nick: None, +                    avatar: None, +                }) +            } +        }      }      /// returns whether or not the nickname was updated      pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { -        Ok(true) +        let rows_affected; +        { +            rows_affected = self.db().await.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, None::<String>, None::<String>, None::<String>))?; +        } +        if rows_affected > 0 { +            Ok(true) +        } else { +            Ok(false) +        }      }      /// returns whether or not the nickname was updated      pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { -        Ok(true) +        let rows_affected; +        { +            rows_affected = self.db().await.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, &nick, &nick, &nick))?; +        } +        if rows_affected > 0 { +            Ok(true) +        } else { +            Ok(false) +        }      }      /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar @@ -88,7 +143,21 @@ impl Db {          &self,          jid: JID,      ) -> Result<(bool, Option<String>), Error> { -        Ok((true, None)) +        let (old_avatar, rows_affected): (Option<String>, _); +        { +            let db = self.db().await; +            old_avatar = db +                .query_row("select avatar from users where jid = ?1", [&jid], |row| { +                    Ok(row.get(0)?) +                }) +                .optional()?; +            rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, None::<String>, None::<String>, None::<String>))?; +        } +        if rows_affected > 0 { +            Ok((true, old_avatar)) +        } else { +            Ok((false, old_avatar)) +        }      }      /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar @@ -97,10 +166,31 @@ impl Db {          jid: JID,          avatar: String,      ) -> Result<(bool, Option<String>), Error> { -        Ok((true, None)) +        let (old_avatar, rows_affected): (Option<String>, _); +        { +            let db = self.db().await; +            old_avatar = db +                .query_row("select avatar from users where jid = ?1", [&jid], |row| { +                    let avatar: Option<String> = row.get(0)?; +                    Ok(avatar) +                }) +                .optional()? +                .unwrap_or_default(); +            rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, &avatar, &avatar, &avatar))?; +        } +        if rows_affected > 0 { +            Ok((true, old_avatar)) +        } else { +            Ok((false, old_avatar)) +        }      } +    // TODO: use references everywhere      pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { +        self.db().await.execute( +            "update users set nick = ?1, avatar = ?2 where user_jid = ?1", +            (&user.nick, &user.avatar, &user.jid), +        )?;          Ok(())      } @@ -109,63 +199,226 @@ impl Db {      /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately      pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> { +        let db = self.db().await; +        db.execute( +            "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 )", +            (&contact.user_jid, &contact.name, contact.subscription), +        )?; +        for group in contact.groups { +            db.execute( +                "insert into groups (group_name) values (?1) on conflict do nothing", +                [&group], +            )?; +            db.execute( +                "insert into groups_roster (group_name, contact_jid) values (?1, ?2)", +                (group, &contact.user_jid), +            )?; +        }          Ok(())      }      pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> { -        Ok(Contact { -            user_jid: contact, -            subscription: crate::roster::Subscription::None, -            name: None, -            groups: HashSet::new(), -        }) +        let db = self.db().await; +        let mut contact_item = db.query_row( +            "select user_jid, name, subscription from roster where user_jid = ?1", +            [&contact], +            |row| { +                Ok(Contact { +                    user_jid: row.get(0)?, +                    name: row.get(1)?, +                    subscription: row.get(2)?, +                    groups: HashSet::new(), +                }) +            }, +        )?; +        let groups: Result<HashSet<String>, _> = db +            .prepare("select group_name from groups_roster where contact_jid = ?1")? +            .query_map([&contact], |row| Ok(row.get(0)?))? +            .collect(); +        contact_item.groups = groups?; +        Ok(contact_item)      }      pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> { -        Ok(None) +        let db = self.db().await; +        let contact_item = db +            .query_row( +                "select user_jid, name, subscription from roster where user_jid = ?1", +                [&contact], +                |row| { +                    Ok(Contact { +                        user_jid: row.get(0)?, +                        name: row.get(1)?, +                        subscription: row.get(2)?, +                        groups: HashSet::new(), +                    }) +                }, +            ) +            .optional()?; +        if let Some(mut contact_item) = contact_item { +            let groups: Result<HashSet<String>, _> = db +                .prepare("select group_name from groups_roster where contact_jid = ?1")? +                .query_map([&contact], |row| Ok(row.get(0)?))? +                .collect(); +            contact_item.groups = groups?; +            Ok(Some(contact_item)) +        } else { +            Ok(None) +        }      }      /// does not update the underlying user, to update user, update_user() must be called separately      pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> { +        let db = self.db().await; +        db.execute( +            "update roster set name = ?1, subscription = ?2 where user_jid = ?3", +            (&contact.name, &contact.subscription, &contact.user_jid), +        )?; +        db.execute( +            "delete from groups_roster where contact_jid = ?1", +            [&contact.user_jid], +        )?; +        for group in contact.groups { +            db.execute( +                "insert into groups (group_name) values (?1) on conflict do nothing", +                [&group], +            )?; +            db.execute( +                "insert into groups_roster (group_name, contact_jid), values (?1, ?2)", +                (&group, &contact.user_jid), +            )?; +        } +        // TODO: delete orphaned groups from groups table, users etc.          Ok(())      }      pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { +        let db = self.db().await; +        db.execute( +            "insert into users (jid) values (?1) on conflict do nothing", +            [&contact.user_jid], +        )?; +        db.execute( +            "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 ) on conflict do update set name = ?4, subscription = ?5", +            (&contact.user_jid, &contact.name, &contact.subscription, &contact.name, &contact.subscription), +        )?; +        db.execute( +            "delete from groups_roster where contact_jid = ?1", +            [&contact.user_jid], +        )?; +        for group in contact.groups { +            db.execute( +                "insert into groups (group_name) values (?1) on conflict do nothing", +                [&group], +            )?; +            db.execute( +                "insert into groups_roster (group_name, contact_jid) values (?1, ?2)", +                (group, &contact.user_jid), +            )?; +        }          Ok(())      }      pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> { +        self.db() +            .await +            .execute("delete from roster where user_jid = ?1", [&contact])?;          Ok(())      }      pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> { +        { +            self.db().await.execute("delete from roster", [])?; +        } +        for contact in roster { +            self.upsert_contact(contact).await?; +        }          Ok(())      }      pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { -        Ok(Vec::new()) +        let db = self.db().await; +        let mut roster: Vec<_> = db +            .prepare("select user_jid, name, subscription from roster")? +            .query_map([], |row| { +                Ok(Contact { +                    user_jid: row.get(0)?, +                    name: row.get(1)?, +                    subscription: row.get(2)?, +                    groups: HashSet::new(), +                }) +            })? +            .collect::<Result<Vec<_>, _>>()?; +        for contact in &mut roster { +            let groups: Result<HashSet<String>, _> = db +                .prepare("select group_name from groups_roster where contact_jid = ?1")? +                .query_map([&contact.user_jid], |row| Ok(row.get(0)?))? +                .collect(); +            contact.groups = groups?; +        } +        Ok(roster)      }      pub(crate) async fn read_cached_roster_with_users(          &self,      ) -> Result<Vec<(Contact, User)>, Error> { -        Ok(Vec::new()) +        let db = self.db().await; +        let mut roster: Vec<(Contact, User)> = db.prepare("select user_jid, name, subscription, jid, nick, avatar from roster join users on jid = user_jid")?.query_map([], |row| { +            Ok(( +                Contact { +                    user_jid: row.get(0)?, +                    name: row.get(1)?, +                    subscription: row.get(2)?, +                    groups: HashSet::new(), +                }, +                User { +                    jid: row.get(3)?, +                    nick: row.get(4)?, +                    avatar: row.get(5)?, +                } +            )) +        })?.collect::<Result<Vec<_>, _>>()?; +        for (contact, _) in &mut roster { +            let groups: Result<HashSet<String>, _> = db +                .prepare("select group_name from groups_roster where contact_jid = ?1")? +                .query_map([&contact.user_jid], |row| Ok(row.get(0)?))? +                .collect(); +            contact.groups = groups?; +        } +        Ok(roster)      }      pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> { +        let id = Uuid::new_v4(); +        let jid = chat.correspondent(); +        self.db().await.execute( +            "insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3)", +            (id, jid, chat.have_chatted), +        )?;          Ok(())      }      // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.      pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> { -        Ok(Chat { -            correspondent: chat, -            have_chatted: false, -        }) +        let chat = self.db().await.query_row( +            "select correspondent, have_chatted from chats where correspondent = ?1", +            [&chat], +            |row| { +                Ok(Chat { +                    correspondent: row.get(0)?, +                    have_chatted: row.get(1)?, +                }) +            }, +        )?; +        Ok(chat)      }      pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { +        self.db().await.execute( +            "update chats set have_chatted = true where correspondent = ?1", +            [chat], +        )?;          Ok(())      } @@ -174,27 +427,59 @@ impl Db {          old_chat: Chat,          new_correspondent: JID,      ) -> Result<Chat, Error> { -        Ok(Chat { -            correspondent: new_correspondent, -            have_chatted: false, -        }) +        let new_jid = &new_correspondent; +        let old_jid = old_chat.correspondent(); +        let chat = self.db().await.query_row( +            "update chats set correspondent = ?1 where correspondent = ?2 returning correspondent, have_chatted", +            [new_jid, old_jid], +            |row| Ok(Chat { +                correspondent: row.get(0)?, +                have_chatted: row.get(1)?, +            }) +        )?; +        Ok(chat)      }      // pub(crate) async fn update_chat      pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> { +        self.db() +            .await +            .execute("delete from chats where correspondent = ?1", [chat])?;          Ok(())      }      /// TODO: sorting and filtering (for now there is no sorting)      pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> { -        Ok(Vec::new()) +        let chats = self +            .db() +            .await +            .prepare("select correspondent, have_chatted from chats")? +            .query_map([], |row| { +                Ok(Chat { +                    correspondent: row.get(0)?, +                    have_chatted: row.get(1)?, +                }) +            })? +            .collect::<Result<Vec<_>, _>>()?; +        Ok(chats)      }      /// chats ordered by date of last message      // greatest-n-per-group      pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> { -        Ok(Vec::new()) +        let chats = self +            .db() +            .await +            .prepare("select c.correspondent, c.have_chatted, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")? +            .query_map([], |row| { +                Ok(Chat { +                    correspondent: row.get(0)?, +                    have_chatted: row.get(1)?, +                }) +            })? +            .collect::<Result<Vec<_>, _>>()?; +        Ok(chats)      }      /// chats ordered by date of last message @@ -202,7 +487,29 @@ impl Db {      pub(crate) async fn read_chats_ordered_with_latest_messages(          &self,      ) -> Result<Vec<(Chat, Message)>, Error> { -        Ok(Vec::new()) +        let chats = self +            .db() +            .await +            .prepare("select c.correspondent, c.have_chatted, m.id, m.from_jid, m.delivery, m.timestamp, m.body from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")? +            .query_map([], |row| { +                Ok(( +                    Chat { +                        correspondent: row.get(0)?, +                        have_chatted: row.get(1)?, +                    }, +                    Message { +                        id: row.get(2)?, +                        from: row.get(3)?, +                        delivery: row.get(4)?, +                        timestamp: row.get(5)?, +                        body: Body { +                            body: row.get(6)?, +                        }, +                    } +                )) +            })? +            .collect::<Result<Vec<_>, _>>()?; +        Ok(chats)      }      /// chats ordered by date of last message @@ -210,15 +517,65 @@ impl Db {      pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(          &self,      ) -> Result<Vec<((Chat, User), (Message, User))>, Error> { -        Ok(Vec::new()) +        let chats = self +            .db() +            .await +            .prepare("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc")? +            .query_map([], |row| { +                Ok(( +                    ( +                        Chat { +                            correspondent: row.get("chat_correspondent")?, +                            have_chatted: row.get("chat_have_chatted")?, +                        }, +                        User { +                            jid: row.get("chat_user_jid")?, +                            nick: row.get("chat_user_nick")?, +                            avatar: row.get("chat_user_avatar")?, +                        } +                    ), +                    ( +                        Message { +                            id: row.get("message_id")?, +                            from: row.get("message_from_jid")?, +                            delivery: row.get("message_delivery")?, +                            timestamp: row.get("message_timestamp")?, +                            body: Body { +                                body: row.get("message_body")?, +                            }, +                        }, +                        User { +                            jid: row.get("message_user_jid")?, +                            nick: row.get("message_user_nick")?, +                            avatar: row.get("message_user_avatar")?, +                        } +                    ), +                )) +            })? +            .collect::<Result<Vec<_>, _>>()?; +        Ok(chats)      }      async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> { -        Ok(Uuid::new_v4()) +        let chat_id = self.db().await.query_row( +            "select id from chats where correspondent = ?1", +            [chat], +            |row| Ok(row.get(0)?), +        )?; +        Ok(chat_id)      }      async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> { -        Ok(None) +        let chat_id = self +            .db() +            .await +            .query_row( +                "select id from chats where correspondent = ?1", +                [chat], +                |row| Ok(row.get(0)?), +            ) +            .optional()?; +        Ok(chat_id)      }      /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. @@ -228,32 +585,52 @@ impl Db {          chat: JID,          from: JID,      ) -> Result<(), Error> { +        let from_jid = from.as_bare(); +        let chat_id = self.read_chat_id(chat).await?; +        self.db().await.execute("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp, delivery) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)", (&message.id, &message.body.body, &chat_id, &from_jid, &from.resourcepart, &message.timestamp, &message.delivery))?;          Ok(())      }      pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> { -        Ok(false) -    } - -    /// MUST upsert chat beforehand -    pub(crate) async fn create_message_with_self_resource( -        &self, -        message: Message, -        chat: JID, -        // full jid -        from: JID, -    ) -> Result<(), Error> { -        Ok(()) +        let bare_chat = chat.as_bare(); +        let db = self.db().await; +        db.execute( +            "insert into users (jid) values (?1) on conflict do nothing", +            [&chat], +        )?; +        let id = Uuid::new_v4(); +        db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &bare_chat, false))?; +        let chat = db.query_row( +            "select correspondent, have_chatted from chats where correspondent = ?1", +            [&bare_chat], +            |row| { +                Ok(Chat { +                    correspondent: row.get(0)?, +                    have_chatted: row.get(1)?, +                }) +            }, +        )?; +        Ok(chat.have_chatted)      }      /// create direct message from incoming. MUST upsert chat and user      pub(crate) async fn create_message_with_user_resource(          &self,          message: Message, +        // TODO: enforce two kinds of jid. bare and full +        // must be bare jid          chat: JID,          // full jid          from: JID,      ) -> Result<(), Error> { +        let from_jid = from.as_bare(); +        if let Some(resource) = &from.resourcepart { +            self.db().await.execute( +                "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing", +                (&from_jid, resource), +            )?; +        } +        self.create_message(message, chat, from).await?;          Ok(())      } @@ -270,35 +647,102 @@ impl Db {      // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}      pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> { +        self.db() +            .await +            .execute("delete from messages where id = ?1", [message])?;          Ok(())      }      // TODO: paging      pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> { -        Ok(Vec::new()) +        let chat_id = self.read_chat_id(chat).await?; +        let messages = self +            .db() +            .await +            .prepare( +                "select id, from_jid, delivery, timestamp, body from messages where chat_id = ?1", +            )? +            .query_map([chat_id], |row| { +                Ok(Message { +                    id: row.get(0)?, +                    // TODO: full from +                    from: row.get(1)?, +                    delivery: row.get(2)?, +                    timestamp: row.get(3)?, +                    body: Body { body: row.get(4)? }, +                }) +            })? +            .collect::<Result<Vec<_>, _>>()?; +        Ok(messages)      }      pub(crate) async fn read_message_history_with_users(          &self,          chat: JID,      ) -> Result<Vec<(Message, User)>, Error> { -        Ok(Vec::new()) +        let chat_id = self.read_chat_id(chat).await?; +        let messages = self +            .db() +            .await +            .prepare( +                "select id, from_jid, delivery, timestamp, body, jid, nick, avatar from messages join users on jid = from_jid where chat_id = ? order by timestamp asc", +            )? +            .query_map([chat_id], |row| { +                Ok(( +                    Message { +                        id: row.get(0)?, +                        // TODO: full from +                        from: row.get(1)?, +                        delivery: row.get(2)?, +                        timestamp: row.get(3)?, +                        body: Body { body: row.get(4)? }, +                    }, +                    User { +                        jid: row.get(5)?, +                        nick: row.get(6)?, +                        avatar: row.get(7)?, +                    } +                )) +            })? +            .collect::<Result<Vec<_>, _>>()?; +        Ok(messages)      }      pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> { -        Ok(Online::default()) +        let status = self.db().await.query_row( +            "select show, message from cached_status where id = 0", +            [], +            |row| { +                Ok(Online { +                    show: row.get(0)?, +                    status: row.get(1)?, +                    priority: None, +                }) +            }, +        )?; +        Ok(status)      }      pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { +        self.db().await.execute("insert into cached_status (id, show, message) values (0, ?1, ?2) on conflict do update set show = ?3, message = ?4", (status.show, &status.status, status.show, &status.status))?;          Ok(())      }      pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> { +        self.db().await.execute( +            "update cached_status set show = null, message = null where id = 0", +            [], +        )?;          Ok(())      }      pub(crate) async fn read_capabilities(&self, node: &str) -> Result<String, Error> { -        Ok("aHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvY2Fwcx9odHRwOi8vamFiYmVyLm9yZy9wcm90b2NvbC9kaXNjbyNpbmZvH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL2Rpc2NvI2l0ZW1zH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL25pY2sfaHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvbmljaytub3RpZnkfHGNsaWVudB9wYx8fZmlsYW1lbnRvIDAuMS4wHx4cHA==".to_string()) +        let capabilities = self.db().await.query_row( +            "select capabilities from capability_hash_nodes where node = ?1", +            [node], +            |row| Ok(row.get(0)?), +        )?; +        Ok(capabilities)      }      pub(crate) async fn upsert_capabilities( @@ -306,6 +750,8 @@ impl Db {          node: &str,          capabilities: &str,      ) -> Result<(), Error> { +        let now = Utc::now(); +        self.db().await.execute("insert into capability_hash_nodes (node, timestamp, capabilities) values (?1, ?2, ?3) on conflict do update set timestamp = ?, capabilities = ?", (node, now, capabilities, now, capabilities))?;          Ok(())      } @@ -1046,20 +1492,6 @@ impl Db {      // ) -> Result<(), Error> {      //     let bare_chat = chat.as_bare();      //     let resource = &chat.resourcepart; -    //     // sqlx::query!( -    //     //     "insert into users (jid) values (?) on conflict do nothing", -    //     //     bare_chat -    //     // ) -    //     // .execute(&self.db) -    //     // .await?; -    //     // let id = Uuid::new_v4(); -    //     // sqlx::query!( -    //     //     "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", -    //     //     id, -    //     //     bare_chat -    //     // ) -    //     // .execute(&self.db) -    //     // .await?;      //     if let Some(resource) = resource {      //         sqlx::query!(      //             "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index b87484c..55e3d4a 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -159,7 +159,7 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(              // TODO: mark these as potentially failed upon client launch              if let Err(e) = logic                  .db() -                .create_message_with_self_resource( +                .create_message_with_user_resource(                      message.clone(),                      jid.clone(),                      // TODO: when message is queued and sent, the from must also be updated with the correct resource diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index 767f923..c0c3a7f 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -523,7 +523,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,      // TODO: mark these as potentially failed upon client launch      if let Err(e) = logic          .db() -        .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone()) +        .create_message_with_user_resource(message.clone(), jid.clone(), connection.jid().clone())          .await      {          // TODO: should these really be handle_error or just the error macro? diff --git a/filamento/src/presence.rs b/filamento/src/presence.rs index e406cce..2184ad2 100644 --- a/filamento/src/presence.rs +++ b/filamento/src/presence.rs @@ -1,4 +1,8 @@  use chrono::{DateTime, Utc}; +use rusqlite::{ +    ToSql, +    types::{FromSql, ToSqlOutput, Value}, +};  use stanza::{client::presence::String1024, xep_0203::Delay};  use crate::caps; @@ -18,6 +22,30 @@ pub enum Show {      ExtendedAway,  } +impl ToSql for Show { +    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { +        Ok(match self { +            Show::Away => ToSqlOutput::Owned(Value::Text("away".to_string())), +            Show::Chat => ToSqlOutput::Owned(Value::Text("chat".to_string())), +            Show::DoNotDisturb => ToSqlOutput::Owned(Value::Text("do-not-disturb".to_string())), +            Show::ExtendedAway => ToSqlOutput::Owned(Value::Text("extended-away".to_string())), +        }) +    } +} + +impl FromSql for Show { +    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> { +        Ok(match value.as_str()? { +            "away" => Self::Away, +            "chat" => Self::Chat, +            "do-not-disturb" => Self::DoNotDisturb, +            "extended-away" => Self::ExtendedAway, +            // TODO: horrible +            value => panic!("unexpected {value}"), +        }) +    } +} +  #[derive(Debug, Default, Clone)]  pub struct Offline {      pub status: Option<String>, diff --git a/filamento/src/roster.rs b/filamento/src/roster.rs index ba5b3cd..99682b1 100644 --- a/filamento/src/roster.rs +++ b/filamento/src/roster.rs @@ -1,6 +1,10 @@  use std::collections::HashSet;  use jid::JID; +use rusqlite::{ +    ToSql, +    types::{FromSql, ToSqlOutput, Value}, +};  pub struct ContactUpdate {      pub name: Option<String>, @@ -35,6 +39,46 @@ pub enum Subscription {      // Remove,  } +impl ToSql for Subscription { +    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> { +        Ok(match self { +            Subscription::None => ToSqlOutput::Owned(Value::Text("none".to_string())), +            Subscription::PendingOut => ToSqlOutput::Owned(Value::Text("pending-out".to_string())), +            Subscription::PendingIn => ToSqlOutput::Owned(Value::Text("pending-in".to_string())), +            Subscription::PendingInPendingOut => { +                ToSqlOutput::Owned(Value::Text("pending-in-pending-out".to_string())) +            } +            Subscription::OnlyOut => ToSqlOutput::Owned(Value::Text("only-out".to_string())), +            Subscription::OnlyIn => ToSqlOutput::Owned(Value::Text("only-in".to_string())), +            Subscription::OutPendingIn => { +                ToSqlOutput::Owned(Value::Text("out-pending-in".to_string())) +            } +            Subscription::InPendingOut => { +                ToSqlOutput::Owned(Value::Text("in-pending-out".to_string())) +            } +            Subscription::Buddy => ToSqlOutput::Owned(Value::Text("buddy".to_string())), +        }) +    } +} + +impl FromSql for Subscription { +    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> { +        Ok(match value.as_str()? { +            "none" => Self::None, +            "pending-out" => Self::PendingOut, +            "pending-in" => Self::PendingIn, +            "pending-in-pending-out" => Self::PendingInPendingOut, +            "only-out" => Self::OnlyOut, +            "only-in" => Self::OnlyIn, +            "out-pending-in" => Self::OutPendingIn, +            "in-pending-out" => Self::InPendingOut, +            "buddy" => Self::Buddy, +            // TODO: don't have these lol +            value => panic!("unexpected subscription `{value}`"), +        }) +    } +} +  // none  // >  // >> | 
