diff options
| author | 2025-04-17 11:03:51 +0100 | |
|---|---|---|
| committer | 2025-04-17 11:03:51 +0100 | |
| commit | b9d75f38743113c054be3d97af36bdd2a7dd0d69 (patch) | |
| tree | 537623664010d26d5f2574e2d51d03f8c25e08ac | |
| parent | cf51dcf052af89f8742d887bde2c93d735309bdd (diff) | |
| download | luz-b9d75f38743113c054be3d97af36bdd2a7dd0d69.tar.gz luz-b9d75f38743113c054be3d97af36bdd2a7dd0d69.tar.bz2 luz-b9d75f38743113c054be3d97af36bdd2a7dd0d69.zip | |
feat(filamento): compiles on wasm
Diffstat (limited to '')
| -rw-r--r-- | Cargo.toml | 3 | ||||
| -rw-r--r-- | filamento/Cargo.toml | 18 | ||||
| -rw-r--r-- | filamento/examples/example.rs | 224 | ||||
| -rw-r--r-- | filamento/src/caps.rs | 4 | ||||
| -rw-r--r-- | filamento/src/db.rs | 1595 | ||||
| -rw-r--r-- | filamento/src/files.rs | 41 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 93 | ||||
| -rw-r--r-- | jid/Cargo.toml | 8 | ||||
| -rw-r--r-- | lampada/Cargo.toml | 6 | ||||
| -rw-r--r-- | lampada/src/connection/mod.rs | 27 | ||||
| -rw-r--r-- | lampada/src/connection/read.rs | 55 | ||||
| -rw-r--r-- | lampada/src/connection/write.rs | 11 | ||||
| -rw-r--r-- | lampada/src/error.rs | 11 | ||||
| -rw-r--r-- | lampada/src/lib.rs | 74 | ||||
| -rw-r--r-- | luz/Cargo.toml | 6 | 
15 files changed, 1337 insertions, 839 deletions
| @@ -6,3 +6,6 @@ members = [      "lampada",      "stanza", "jid", "filamento",  ] + +[workspace.dependencies] +tokio = { version = "1.42.0" } diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml index 91b7e91..4c36c95 100644 --- a/filamento/Cargo.toml +++ b/filamento/Cargo.toml @@ -6,10 +6,10 @@ edition = "2024"  [dependencies]  futures = "0.3.31"  lampada = { version = "0.1.0", path = "../lampada" } -tokio = "1.42.0" +tokio = { workspace = true } +# tokio = "1.42.0"  thiserror = "2.0.11"  stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115", "xep_0084"] } -sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }  # TODO: re-export jid?  jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }  uuid = { version = "1.13.1", features = ["v4"] } @@ -22,9 +22,23 @@ sha1 = "0.10.6"  image = "0.25.6"  hex = "0.4.3" +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { workspace = true, features = ["sync", "time", "rt", "fs"] } +sqlx = { path = "../../remote/sqlx", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { workspace = true, features = ["sync", "time", "rt"] } +# tokio = { version = "1.44.3", features = [] } +sqlx = { path = "../../remote/sqlx", features = ["sqlite-precompiled-wasm", "runtime-tokio", "uuid", "chrono"] } +tokio_with_wasm = { version = "0.8.2", features = ["sync", "time", "rt"] } +# wasm-bindgen-futures = "0.4" +  [dev-dependencies]  tracing-subscriber = "0.3.19"  peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" } +tracing-wasm = "0.2.1" +tokio_with_wasm = { version = "0.8.2", features = ["sync", "time", "rt"] } +wasm-bindgen-test = "0.3.0"  [[example]]  name = "example" diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index 8119743..65fe166 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -1,120 +1,122 @@ -use std::{path::Path, str::FromStr, sync::Arc, time::Duration}; +// use std::{path::Path, str::FromStr, sync::Arc, time::Duration}; -use filamento::{Client, db::Db, files::FileStore}; -use jid::JID; -use tokio::io::{self, AsyncReadExt}; -use tracing::info; +// use filamento::{Client, db::Db, files::FileStore}; +// use jid::JID; +// use tokio::io::{self, AsyncReadExt}; +// use tracing::info; -#[derive(Clone, Debug)] -pub struct Files; +// #[derive(Clone, Debug)] +// pub struct Files; -impl FileStore for Files { -    type Err = Arc<io::Error>; +// impl FileStore for Files { +//     type Err = Arc<io::Error>; -    async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> { -        tracing::debug!("checking if {} is stored", name); -        let res = tokio::fs::try_exists(format!("files/{}", name)) -            .await -            .map_err(|err| Arc::new(err)); -        tracing::debug!("file check res: {:?}", res); -        res -    } +//     async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> { +//         tracing::debug!("checking if {} is stored", name); +//         let res = tokio::fs::try_exists(format!("files/{}", name)) +//             .await +//             .map_err(|err| Arc::new(err)); +//         tracing::debug!("file check res: {:?}", res); +//         res +//     } -    async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> { -        tracing::debug!("storing {} is stored", name); -        let res = tokio::fs::write(format!("files/{}", name), data) -            .await -            .map_err(|err| Arc::new(err)); -        tracing::debug!("file store res: {:?}", res); -        res -    } +//     async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> { +//         tracing::debug!("storing {} is stored", name); +//         let res = tokio::fs::write(format!("files/{}", name), data) +//             .await +//             .map_err(|err| Arc::new(err)); +//         tracing::debug!("file store res: {:?}", res); +//         res +//     } -    async fn delete(&self, name: &str) -> Result<(), Self::Err> { -        tracing::debug!("deleting {}", name); -        let res = tokio::fs::remove_file(format!("files/{}", name)) -            .await -            .map_err(|err| Arc::new(err)); -        tracing::debug!("file delete res: {:?}", res); -        res -    } -} +//     async fn delete(&self, name: &str) -> Result<(), Self::Err> { +//         tracing::debug!("deleting {}", name); +//         let res = tokio::fs::remove_file(format!("files/{}", name)) +//             .await +//             .map_err(|err| Arc::new(err)); +//         tracing::debug!("file delete res: {:?}", res); +//         res +//     } +// } -#[tokio::main] -async fn main() { -    tracing_subscriber::fmt::init(); -    let db = Db::create_connect_and_migrate(Path::new("./filamento.db")) -        .await -        .unwrap(); -    let (client, mut recv) = Client::new( -        "test@blos.sm/testing2".try_into().unwrap(), -        "slayed".to_string(), -        db, -        Files, -    ); +// #[tokio::main] +// async fn main() { +//     tracing_subscriber::fmt::init(); +//     let db = Db::create_connect_and_migrate(Path::new("./filamento.db")) +//         .await +//         .unwrap(); +//     let (client, mut recv) = Client::new( +//         "test@blos.sm/testing2".try_into().unwrap(), +//         "slayed".to_string(), +//         db, +//         Files, +//     ); -    tokio::spawn(async move { -        while let Some(msg) = recv.recv().await { -            info!("{:#?}", msg) -        } -    }); +//     tokio::spawn(async move { +//         while let Some(msg) = recv.recv().await { +//             info!("{:#?}", msg) +//         } +//     }); -    client.connect().await.unwrap(); -    tokio::time::sleep(Duration::from_secs(5)).await; -    info!("changing nick"); -    client -        .change_nick(Some("britney".to_string())) -        .await -        .unwrap(); -    let mut profile_pic = tokio::fs::File::open("files/britney_starbies.jpg") -        .await -        .unwrap(); -    let mut data = Vec::new(); -    profile_pic.read_to_end(&mut data).await.unwrap(); -    client.change_avatar(Some(data)).await.unwrap(); -    info!("sending message"); -    client -        .send_message( -            JID::from_str("cel@blos.sm").unwrap(), -            filamento::chat::Body { -                body: "hallo!!!".to_string(), -            }, -        ) -        .await -        .unwrap(); -    info!("sent message"); -    client -        .send_message( -            JID::from_str("cel@blos.sm").unwrap(), -            filamento::chat::Body { -                body: "hallo 2".to_string(), -            }, -        ) -        .await -        .unwrap(); -    tokio::time::sleep(Duration::from_secs(15)).await; -    // info!("sending disco query"); -    // let info = client.disco_info(None, None).await.unwrap(); -    // info!("got disco result: {:#?}", info); -    // let items = client.disco_items(None, None).await.unwrap(); -    // info!("got disco result: {:#?}", items); -    // let info = client -    //     .disco_info(Some("blos.sm".parse().unwrap()), None) -    //     .await -    //     .unwrap(); -    // info!("got disco result: {:#?}", info); -    // let items = client -    //     .disco_items(Some("blos.sm".parse().unwrap()), None) -    //     .await -    //     .unwrap(); -    // info!("got disco result: {:#?}", items); -    // let info = client -    //     .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None) -    //     .await -    //     .unwrap(); -    // info!("got disco result: {:#?}", info); -    // let items = client -    //     .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None) -    //     .await -    //     .unwrap(); -    // info!("got disco result: {:#?}", items); -} +//     client.connect().await.unwrap(); +//     tokio::time::sleep(Duration::from_secs(5)).await; +//     info!("changing nick"); +//     client +//         .change_nick(Some("britney".to_string())) +//         .await +//         .unwrap(); +//     let mut profile_pic = tokio::fs::File::open("files/britney_starbies.jpg") +//         .await +//         .unwrap(); +//     let mut data = Vec::new(); +//     profile_pic.read_to_end(&mut data).await.unwrap(); +//     client.change_avatar(Some(data)).await.unwrap(); +//     info!("sending message"); +//     client +//         .send_message( +//             JID::from_str("cel@blos.sm").unwrap(), +//             filamento::chat::Body { +//                 body: "hallo!!!".to_string(), +//             }, +//         ) +//         .await +//         .unwrap(); +//     info!("sent message"); +//     client +//         .send_message( +//             JID::from_str("cel@blos.sm").unwrap(), +//             filamento::chat::Body { +//                 body: "hallo 2".to_string(), +//             }, +//         ) +//         .await +//         .unwrap(); +//     tokio::time::sleep(Duration::from_secs(15)).await; +//     // info!("sending disco query"); +//     // let info = client.disco_info(None, None).await.unwrap(); +//     // info!("got disco result: {:#?}", info); +//     // let items = client.disco_items(None, None).await.unwrap(); +//     // info!("got disco result: {:#?}", items); +//     // let info = client +//     //     .disco_info(Some("blos.sm".parse().unwrap()), None) +//     //     .await +//     //     .unwrap(); +//     // info!("got disco result: {:#?}", info); +//     // let items = client +//     //     .disco_items(Some("blos.sm".parse().unwrap()), None) +//     //     .await +//     //     .unwrap(); +//     // info!("got disco result: {:#?}", items); +//     // let info = client +//     //     .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None) +//     //     .await +//     //     .unwrap(); +//     // info!("got disco result: {:#?}", info); +//     // let items = client +//     //     .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None) +//     //     .await +//     //     .unwrap(); +//     // info!("got disco result: {:#?}", items); +// } + +fn main() {} diff --git a/filamento/src/caps.rs b/filamento/src/caps.rs index 819e669..46cd903 100644 --- a/filamento/src/caps.rs +++ b/filamento/src/caps.rs @@ -336,7 +336,7 @@ pub fn node_to_hash(node: String) -> Result<Hash, HashNodeConversionError> {  #[cfg(test)]  mod tests { -    use peanuts::{Writer, element::IntoElement}; +    use peanuts::{Writer, element::IntoElement, loggable::Loggable};      use stanza::{          xep_0004::{Field, FieldType, Value, X, XType},          xep_0030::info::{Feature, Identity}, @@ -344,6 +344,7 @@ mod tests {      use super::*; +    #[cfg(not(target_arch = "wasm32"))]      #[tokio::test]      async fn test_caps() {          tracing_subscriber::fmt().init(); @@ -448,6 +449,7 @@ mod tests {          writer.write(&test_caps).await.unwrap();      } +    #[cfg(not(target_arch = "wasm32"))]      #[tokio::test]      pub async fn test_gen_client_caps() {          let stdout = tokio::io::stdout(); diff --git a/filamento/src/db.rs b/filamento/src/db.rs index d9206cc..e3bfdac 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -15,11 +15,12 @@ use crate::{  #[derive(Clone)]  pub struct Db { -    db: SqlitePool, +    // db: SqlitePool,  }  // TODO: turn into trait  impl Db { +    #[cfg(not(target_arch = "wasm32"))]      pub async fn create_connect_and_migrate(          path: impl AsRef<Path>,      ) -> Result<Self, DatabaseOpenError> { @@ -42,76 +43,45 @@ impl Db {          );          let db = SqlitePool::connect(&url).await?;          migrate!().run(&db).await?; -        Ok(Self { db }) +        // Ok(Self { db }) +        Ok(Self {}) +    } + +    #[cfg(target_arch = "wasm32")] +    pub async fn create_connect_and_migrate( +        path: impl AsRef<Path>, +    ) -> Result<Self, DatabaseOpenError> { +        // let url = "mem.db"; +        // let db = SqlitePool::connect(&url).await?; +        // // migrate!().run(&db).await?; +        Ok(Self {})      }      pub(crate) fn new(db: SqlitePool) -> Self { -        Self { db } +        // Self { db } +        Self {}      }      pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> { -        sqlx::query!( -            "insert into users ( jid, nick ) values ( ?, ? )", -            user.jid, -            user.nick, -        ) -        .execute(&self.db) -        .await?;          Ok(())      }      pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { -        sqlx::query!( -            "insert into users ( jid ) values ( ? ) on conflict do nothing", -            user -        ) -        .execute(&self.db) -        .await?; -        let user: User = sqlx::query_as("select * from users where jid = ?") -            .bind(user) -            .fetch_one(&self.db) -            .await?; -        Ok(user) +        Ok(User { +            jid: user, +            nick: None, +            avatar: None, +        })      }      /// returns whether or not the nickname was updated      pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { -        if sqlx::query!( -            "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?", -            jid, -            None::<String>, -            None::<String>, -            None::<String>, -        ) -        .execute(&self.db) -        .await? -        .rows_affected() -            > 0 -        { -            Ok(true) -        } else { -            Ok(false) -        } +        Ok(true)      }      /// returns whether or not the nickname was updated      pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { -        let rows_affected = sqlx::query!( -            "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?", -            jid, -            nick, -            nick, -            nick -        ) -        .execute(&self.db) -        .await? -        .rows_affected(); -        tracing::debug!("rows affected: {}", rows_affected); -        if rows_affected > 0 { -            Ok(true) -        } else { -            Ok(false) -        } +        Ok(true)      }      /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar @@ -119,32 +89,7 @@ impl Db {          &self,          jid: JID,      ) -> Result<(bool, Option<String>), Error> { -        #[derive(sqlx::FromRow)] -        struct AvatarRow { -            avatar: Option<String>, -        } -        let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") -            .bind(jid.clone()) -            .fetch_optional(&self.db) -            .await? -            .map(|row: AvatarRow| row.avatar) -            .unwrap_or(None); -        if sqlx::query!( -            "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?", -            jid, -            None::<String>, -            None::<String>, -            None::<String>, -        ) -        .execute(&self.db) -        .await? -        .rows_affected() -            > 0 -        { -            Ok((true, old_avatar)) -        } else { -            Ok((false, old_avatar)) -        } +        Ok((true, None))      }      /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar @@ -153,42 +98,10 @@ impl Db {          jid: JID,          avatar: String,      ) -> Result<(bool, Option<String>), Error> { -        #[derive(sqlx::FromRow)] -        struct AvatarRow { -            avatar: Option<String>, -        } -        let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") -            .bind(jid.clone()) -            .fetch_optional(&self.db) -            .await? -            .map(|row: AvatarRow| row.avatar) -            .unwrap_or(None); -        if sqlx::query!( -            "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?", -            jid, -            avatar, -            avatar, -            avatar, -        ) -        .execute(&self.db) -        .await? -        .rows_affected() -            > 0 -        { -            Ok((true, old_avatar)) -        } else { -            Ok((false, old_avatar)) -        } +        Ok((true, None))      }      pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { -        sqlx::query!( -            "update users set nick = ? where jid = ?", -            user.nick, -            user.jid -        ) -        .execute(&self.db) -        .await?;          Ok(())      } @@ -197,252 +110,63 @@ impl Db {      /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately      pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> { -        sqlx::query!( -            "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )", -            contact.user_jid, -            contact.name, -            contact.subscription -        ) -        .execute(&self.db) -        .await?; -        // TODO: abstract this out in to add_to_group() function ? -        for group in contact.groups { -            sqlx::query!( -                "insert into groups (group_name) values (?) on conflict do nothing", -                group -            ) -            .execute(&self.db) -            .await?; -            sqlx::query!( -                "insert into groups_roster (group_name, contact_jid) values (?, ?)", -                group, -                contact.user_jid -            ) -            .execute(&self.db) -            .await?; -        }          Ok(())      }      pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> { -        let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?") -            .bind(contact) -            .fetch_one(&self.db) -            .await?; -        #[derive(sqlx::FromRow)] -        struct Row { -            group_name: String, -        } -        let groups: Vec<Row> = -            sqlx::query_as("select group_name from groups_roster where contact_jid = ?") -                .bind(&contact.user_jid) -                .fetch_all(&self.db) -                .await?; -        contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); -        Ok(contact) +        Ok(Contact { +            user_jid: contact, +            subscription: crate::roster::Subscription::None, +            name: None, +            groups: HashSet::new(), +        })      }      pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> { -        let contact: Option<Contact> = -            sqlx::query_as("select * from roster join users on jid = user_jid where jid = ?") -                .bind(contact) -                .fetch_optional(&self.db) -                .await?; -        if let Some(mut contact) = contact { -            #[derive(sqlx::FromRow)] -            struct Row { -                group_name: String, -            } -            let groups: Vec<Row> = -                sqlx::query_as("select group_name from groups_roster where contact_jid = ?") -                    .bind(&contact.user_jid) -                    .fetch_all(&self.db) -                    .await?; -            contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); -            Ok(Some(contact)) -        } else { -            Ok(None) -        } +        Ok(None)      }      /// does not update the underlying user, to update user, update_user() must be called separately      pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> { -        sqlx::query!( -            "update roster set name = ?, subscription = ? where user_jid = ?", -            contact.name, -            contact.subscription, -            contact.user_jid -        ) -        .execute(&self.db) -        .await?; -        sqlx::query!( -            "delete from groups_roster where contact_jid = ?", -            contact.user_jid -        ) -        .execute(&self.db) -        .await?; -        // TODO: delete orphaned groups from groups table -        for group in contact.groups { -            sqlx::query!( -                "insert into groups (group_name) values (?) on conflict do nothing", -                group -            ) -            .execute(&self.db) -            .await?; -            sqlx::query!( -                "insert into groups_roster (group_name, contact_jid) values (?, ?)", -                group, -                contact.user_jid -            ) -            .execute(&self.db) -            .await?; -        }          Ok(())      }      pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { -        sqlx::query!( -            "insert into users ( jid ) values ( ? ) on conflict do nothing", -            contact.user_jid, -        ) -        .execute(&self.db) -        .await?; -        sqlx::query!( -            "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?", -            contact.user_jid, -            contact.name, -            contact.subscription, -            contact.name, -            contact.subscription -        ) -        .execute(&self.db) -        .await?; -        sqlx::query!( -            "delete from groups_roster where contact_jid = ?", -            contact.user_jid -        ) -        .execute(&self.db) -        .await?; -        // TODO: delete orphaned groups from groups table -        for group in contact.groups { -            sqlx::query!( -                "insert into groups (group_name) values (?) on conflict do nothing", -                group -            ) -            .execute(&self.db) -            .await?; -            sqlx::query!( -                "insert into groups_roster (group_name, contact_jid) values (?, ?)", -                group, -                contact.user_jid -            ) -            .execute(&self.db) -            .await?; -        }          Ok(())      }      pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> { -        sqlx::query!("delete from roster where user_jid = ?", contact) -            .execute(&self.db) -            .await?; -        // TODO: delete orphaned groups from groups table          Ok(())      }      pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> { -        sqlx::query!("delete from roster").execute(&self.db).await?; -        for contact in roster { -            self.upsert_contact(contact).await?; -        }          Ok(())      }      pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { -        let mut roster: Vec<Contact> = sqlx::query_as("select * from roster") -            .fetch_all(&self.db) -            .await?; -        for contact in &mut roster { -            #[derive(sqlx::FromRow)] -            struct Row { -                group_name: String, -            } -            let groups: Vec<Row> = -                sqlx::query_as("select group_name from groups_roster where contact_jid = ?") -                    .bind(&contact.user_jid) -                    .fetch_all(&self.db) -                    .await?; -            contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); -        } -        Ok(roster) +        Ok(Vec::new())      }      pub(crate) async fn read_cached_roster_with_users(          &self,      ) -> Result<Vec<(Contact, User)>, Error> { -        #[derive(sqlx::FromRow)] -        struct Row { -            #[sqlx(flatten)] -            contact: Contact, -            #[sqlx(flatten)] -            user: User, -        } -        let mut roster: Vec<Row> = -            sqlx::query_as("select * from roster join users on jid = user_jid") -                .fetch_all(&self.db) -                .await?; -        for row in &mut roster { -            #[derive(sqlx::FromRow)] -            struct Row { -                group_name: String, -            } -            let groups: Vec<Row> = -                sqlx::query_as("select group_name from groups_roster where contact_jid = ?") -                    .bind(&row.contact.user_jid) -                    .fetch_all(&self.db) -                    .await?; -            row.contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); -        } -        let roster = roster -            .into_iter() -            .map(|row| (row.contact, row.user)) -            .collect(); -        Ok(roster) +        Ok(Vec::new())      }      pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> { -        let id = Uuid::new_v4(); -        let jid = chat.correspondent(); -        sqlx::query!( -            "insert into chats (id, correspondent, have_chatted) values (?, ?, ?)", -            id, -            jid, -            chat.have_chatted, -        ) -        .execute(&self.db) -        .await?;          Ok(())      }      // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.      pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> { -        // check if the chat correponding with the jid exists -        let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?") -            .bind(chat) -            .fetch_one(&self.db) -            .await?; -        Ok(chat) +        Ok(Chat { +            correspondent: chat, +            have_chatted: false, +        })      }      pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { -        let jid = chat.as_bare(); -        sqlx::query!( -            "update chats set have_chatted = true where correspondent = ?", -            jid -        ) -        .execute(&self.db) -        .await?;          Ok(())      } @@ -451,44 +175,27 @@ impl Db {          old_chat: Chat,          new_correspondent: JID,      ) -> Result<Chat, Error> { -        // TODO: update other chat data if it differs (for now there is only correspondent so doesn't matter) -        let new_jid = &new_correspondent; -        let old_jid = old_chat.correspondent(); -        sqlx::query!( -            "update chats set correspondent = ? where correspondent = ?", -            new_jid, -            old_jid, -        ) -        .execute(&self.db) -        .await?; -        let chat = self.read_chat(new_correspondent).await?; -        Ok(chat) +        Ok(Chat { +            correspondent: new_correspondent, +            have_chatted: false, +        })      }      // pub(crate) async fn update_chat      pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> { -        sqlx::query!("delete from chats where correspondent = ?", chat) -            .execute(&self.db) -            .await?;          Ok(())      }      /// TODO: sorting and filtering (for now there is no sorting)      pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> { -        let chats: Vec<Chat> = sqlx::query_as("select * from chats") -            .fetch_all(&self.db) -            .await?; -        Ok(chats) +        Ok(Vec::new())      }      /// chats ordered by date of last message      // greatest-n-per-group      pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> { -        let chats = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc") -            .fetch_all(&self.db) -            .await?; -        Ok(chats) +        Ok(Vec::new())      }      /// chats ordered by date of last message @@ -496,77 +203,7 @@ impl Db {      pub(crate) async fn read_chats_ordered_with_latest_messages(          &self,      ) -> Result<Vec<(Chat, Message)>, Error> { -        #[derive(sqlx::FromRow)] -        pub struct RowChat { -            chat_correspondent: JID, -            chat_have_chatted: bool, -        } -        impl From<RowChat> for Chat { -            fn from(value: RowChat) -> Self { -                Self { -                    correspondent: value.chat_correspondent, -                    have_chatted: value.chat_have_chatted, -                } -            } -        } -        #[derive(sqlx::FromRow)] -        pub struct RowMessage { -            message_id: Uuid, -            message_body: String, -            message_delivery: Option<Delivery>, -            message_timestamp: DateTime<Utc>, -            message_from_jid: JID, -        } -        impl From<RowMessage> for Message { -            fn from(value: RowMessage) -> Self { -                Self { -                    id: value.message_id, -                    from: value.message_from_jid, -                    delivery: value.message_delivery, -                    timestamp: value.message_timestamp, -                    body: Body { -                        body: value.message_body, -                    }, -                } -            } -        } - -        #[derive(sqlx::FromRow)] -        pub struct ChatWithMessageRow { -            #[sqlx(flatten)] -            pub chat: RowChat, -            #[sqlx(flatten)] -            pub message: RowMessage, -        } - -        pub struct ChatWithMessage { -            chat: Chat, -            message: Message, -        } - -        impl From<ChatWithMessageRow> for ChatWithMessage { -            fn from(value: ChatWithMessageRow) -> Self { -                Self { -                    chat: value.chat.into(), -                    message: value.message.into(), -                } -            } -        } - -        // TODO: sometimes chats have no messages. -        let chats: Vec<ChatWithMessageRow> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc") -            .fetch_all(&self.db) -            .await?; - -        let chats = chats -            .into_iter() -            .map(|chat_with_message_row| { -                let chat_with_message: ChatWithMessage = chat_with_message_row.into(); -                (chat_with_message.chat, chat_with_message.message) -            }) -            .collect(); - -        Ok(chats) +        Ok(Vec::new())      }      /// chats ordered by date of last message @@ -574,150 +211,15 @@ impl Db {      pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(          &self,      ) -> Result<Vec<((Chat, User), (Message, User))>, Error> { -        #[derive(sqlx::FromRow)] -        pub struct RowChat { -            chat_correspondent: JID, -            chat_have_chatted: bool, -        } -        impl From<RowChat> for Chat { -            fn from(value: RowChat) -> Self { -                Self { -                    correspondent: value.chat_correspondent, -                    have_chatted: value.chat_have_chatted, -                } -            } -        } -        #[derive(sqlx::FromRow)] -        pub struct RowMessage { -            message_id: Uuid, -            message_body: String, -            message_delivery: Option<Delivery>, -            message_timestamp: DateTime<Utc>, -            message_from_jid: JID, -        } -        impl From<RowMessage> for Message { -            fn from(value: RowMessage) -> Self { -                Self { -                    id: value.message_id, -                    from: value.message_from_jid, -                    delivery: value.message_delivery, -                    timestamp: value.message_timestamp, -                    body: Body { -                        body: value.message_body, -                    }, -                } -            } -        } -        #[derive(sqlx::FromRow)] -        pub struct RowChatUser { -            chat_user_jid: JID, -            chat_user_nick: Option<String>, -            chat_user_avatar: Option<String>, -        } -        impl From<RowChatUser> for User { -            fn from(value: RowChatUser) -> Self { -                Self { -                    jid: value.chat_user_jid, -                    nick: value.chat_user_nick, -                    avatar: value.chat_user_avatar, -                } -            } -        } -        #[derive(sqlx::FromRow)] -        pub struct RowMessageUser { -            message_user_jid: JID, -            message_user_nick: Option<String>, -            message_user_avatar: Option<String>, -        } -        impl From<RowMessageUser> for User { -            fn from(value: RowMessageUser) -> Self { -                Self { -                    jid: value.message_user_jid, -                    nick: value.message_user_nick, -                    avatar: value.message_user_avatar, -                } -            } -        } -        #[derive(sqlx::FromRow)] -        pub struct ChatWithMessageAndUsersRow { -            #[sqlx(flatten)] -            pub chat: RowChat, -            #[sqlx(flatten)] -            pub chat_user: RowChatUser, -            #[sqlx(flatten)] -            pub message: RowMessage, -            #[sqlx(flatten)] -            pub message_user: RowMessageUser, -        } - -        impl From<ChatWithMessageAndUsersRow> for ChatWithMessageAndUsers { -            fn from(value: ChatWithMessageAndUsersRow) -> Self { -                Self { -                    chat: value.chat.into(), -                    chat_user: value.chat_user.into(), -                    message: value.message.into(), -                    message_user: value.message_user.into(), -                } -            } -        } - -        pub struct ChatWithMessageAndUsers { -            chat: Chat, -            chat_user: User, -            message: Message, -            message_user: User, -        } - -        let chats: Vec<ChatWithMessageAndUsersRow> = sqlx::query_as("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc") -            .fetch_all(&self.db) -            .await?; - -        let chats = chats -            .into_iter() -            .map(|chat_with_message_and_users_row| { -                let chat_with_message_and_users: ChatWithMessageAndUsers = -                    chat_with_message_and_users_row.into(); -                ( -                    ( -                        chat_with_message_and_users.chat, -                        chat_with_message_and_users.chat_user, -                    ), -                    ( -                        chat_with_message_and_users.message, -                        chat_with_message_and_users.message_user, -                    ), -                ) -            }) -            .collect(); - -        Ok(chats) +        Ok(Vec::new())      }      async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> { -        #[derive(sqlx::FromRow)] -        struct Row { -            id: Uuid, -        } -        let chat = chat.as_bare(); -        let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?") -            .bind(chat) -            .fetch_one(&self.db) -            .await?; -        let chat_id = chat_id.id; -        Ok(chat_id) +        Ok(Uuid::new_v4())      }      async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> { -        #[derive(sqlx::FromRow)] -        struct Row { -            id: Uuid, -        } -        let chat_id: Option<Row> = sqlx::query_as("select id from chats where correspondent = ?") -            .bind(chat) -            .fetch_optional(&self.db) -            .await?; -        let chat_id = chat_id.map(|row| row.id); -        Ok(chat_id) +        Ok(None)      }      /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. @@ -727,31 +229,11 @@ impl Db {          chat: JID,          from: JID,      ) -> Result<(), Error> { -        // TODO: one query -        let from_jid = from.as_bare(); -        let chat_id = self.read_chat_id(chat).await?; -        sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, from_jid, from.resourcepart, message.timestamp).execute(&self.db).await?;          Ok(())      }      pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> { -        let bare_chat = chat.as_bare(); -        sqlx::query!( -            "insert into users (jid) values (?) on conflict do nothing", -            bare_chat, -        ) -        .execute(&self.db) -        .await?; -        let id = Uuid::new_v4(); -        let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?") -            .bind(id) -            .bind(bare_chat.clone()) -            .bind(false) -            .bind(bare_chat) -            .fetch_one(&self.db) -            .await?; -        tracing::debug!("CHECKING chat: {:?}", chat); -        Ok(chat.have_chatted) +        Ok(false)      }      /// MUST upsert chat beforehand @@ -762,17 +244,6 @@ impl Db {          // full jid          from: JID,      ) -> Result<(), Error> { -        let from_jid = from.as_bare(); -        if let Some(resource) = &from.resourcepart { -            sqlx::query!( -                "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", -                from_jid, -                resource -            ) -            .execute(&self.db) -            .await?; -        } -        self.create_message(message, chat, from).await?;          Ok(())      } @@ -784,123 +255,51 @@ impl Db {          // full jid          from: JID,      ) -> Result<(), Error> { -        let bare_chat = chat.as_bare(); -        let resource = &chat.resourcepart; -        // sqlx::query!( -        //     "insert into users (jid) values (?) on conflict do nothing", -        //     bare_chat -        // ) -        // .execute(&self.db) -        // .await?; -        // let id = Uuid::new_v4(); -        // sqlx::query!( -        //     "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", -        //     id, -        //     bare_chat -        // ) -        // .execute(&self.db) -        // .await?; -        if let Some(resource) = resource { -            sqlx::query!( -                "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", -                bare_chat, -                resource -            ) -            .execute(&self.db) -            .await?; -        } -        self.create_message(message, chat, from).await?;          Ok(())      } -    pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> { -        let message: Message = sqlx::query_as("select * from messages where id = ?") -            .bind(message) -            .fetch_one(&self.db) -            .await?; -        Ok(message) -    } +    // pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> { +    //     Ok(Message { +    //         id: Uuid, +    //         from: todo!(), +    //         delivery: todo!(), +    //         timestamp: todo!(), +    //         body: todo!(), +    //     }) +    // }      // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}      pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> { -        sqlx::query!("delete from messages where id = ?", message) -            .execute(&self.db) -            .await?;          Ok(())      }      // TODO: paging      pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> { -        let chat_id = self.read_chat_id(chat).await?; -        let messages: Vec<Message> = -            sqlx::query_as("select * from messages where chat_id = ? order by timestamp asc") -                .bind(chat_id) -                .fetch_all(&self.db) -                .await?; -        Ok(messages) +        Ok(Vec::new())      }      pub(crate) async fn read_message_history_with_users(          &self,          chat: JID,      ) -> Result<Vec<(Message, User)>, Error> { -        let chat_id = self.read_chat_id(chat).await?; -        #[derive(sqlx::FromRow)] -        pub struct Row { -            #[sqlx(flatten)] -            user: User, -            #[sqlx(flatten)] -            message: Message, -        } -        let messages: Vec<Row> = -            sqlx::query_as("select * from messages join users on jid = from_jid where chat_id = ? order by timestamp asc") -                .bind(chat_id) -                .fetch_all(&self.db) -                .await?; -        let messages = messages -            .into_iter() -            .map(|row| (row.message, row.user)) -            .collect(); -        Ok(messages) +        Ok(Vec::new())      }      pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> { -        let online: Online = sqlx::query_as("select * from cached_status where id = 0") -            .fetch_one(&self.db) -            .await?; -        Ok(online) +        Ok(Online::default())      }      pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { -        sqlx::query!( -            "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?", -            status.show, -            status.status, -            status.show, -            status.status -        ).execute(&self.db).await?;          Ok(())      }      pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> { -        sqlx::query!("update cached_status set show = null, message = null where id = 0") -            .execute(&self.db) -            .await?;          Ok(())      }      pub(crate) async fn read_capabilities(&self, node: &str) -> Result<String, Error> { -        #[derive(sqlx::FromRow)] -        struct Row { -            capabilities: String, -        } -        let row: Row = -            sqlx::query_as("select capabilities from capability_hash_nodes where node = ?") -                .bind(node) -                .fetch_one(&self.db) -                .await?; -        Ok(row.capabilities) +        Ok("aHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvY2Fwcx9odHRwOi8vamFiYmVyLm9yZy9wcm90b2NvbC9kaXNjbyNpbmZvH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL2Rpc2NvI2l0ZW1zH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL25pY2sfaHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvbmljaytub3RpZnkfHGNsaWVudB9wYx8fZmlsYW1lbnRvIDAuMS4wHx4cHA==".to_string())      }      pub(crate) async fn upsert_capabilities( @@ -908,10 +307,872 @@ impl Db {          node: &str,          capabilities: &str,      ) -> Result<(), Error> { -        let now = Utc::now(); -        sqlx::query!( -            "insert into capability_hash_nodes (node, timestamp, capabilities) values (?, ?, ?) on conflict do update set timestamp = ?, capabilities = ?", node, now, capabilities, now, capabilities -        ).execute(&self.db).await?;          Ok(())      } + +    // pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> { +    //     sqlx::query!( +    //         "insert into users ( jid, nick ) values ( ?, ? )", +    //         user.jid, +    //         user.nick, +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     Ok(()) +    // } + +    // pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { +    //     sqlx::query!( +    //         "insert into users ( jid ) values ( ? ) on conflict do nothing", +    //         user +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     let user: User = sqlx::query_as("select * from users where jid = ?") +    //         .bind(user) +    //         .fetch_one(&self.db) +    //         .await?; +    //     Ok(user) +    // } + +    // /// returns whether or not the nickname was updated +    // pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> { +    //     if sqlx::query!( +    //         "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?", +    //         jid, +    //         None::<String>, +    //         None::<String>, +    //         None::<String>, +    //     ) +    //     .execute(&self.db) +    //     .await? +    //     .rows_affected() +    //         > 0 +    //     { +    //         Ok(true) +    //     } else { +    //         Ok(false) +    //     } +    // } + +    // /// returns whether or not the nickname was updated +    // pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> { +    //     let rows_affected = sqlx::query!( +    //         "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?", +    //         jid, +    //         nick, +    //         nick, +    //         nick +    //     ) +    //     .execute(&self.db) +    //     .await? +    //     .rows_affected(); +    //     tracing::debug!("rows affected: {}", rows_affected); +    //     if rows_affected > 0 { +    //         Ok(true) +    //     } else { +    //         Ok(false) +    //     } +    // } + +    // /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar +    // pub(crate) async fn delete_user_avatar( +    //     &self, +    //     jid: JID, +    // ) -> Result<(bool, Option<String>), Error> { +    //     #[derive(sqlx::FromRow)] +    //     struct AvatarRow { +    //         avatar: Option<String>, +    //     } +    //     let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") +    //         .bind(jid.clone()) +    //         .fetch_optional(&self.db) +    //         .await? +    //         .map(|row: AvatarRow| row.avatar) +    //         .unwrap_or(None); +    //     if sqlx::query!( +    //         "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?", +    //         jid, +    //         None::<String>, +    //         None::<String>, +    //         None::<String>, +    //     ) +    //     .execute(&self.db) +    //     .await? +    //     .rows_affected() +    //         > 0 +    //     { +    //         Ok((true, old_avatar)) +    //     } else { +    //         Ok((false, old_avatar)) +    //     } +    // } + +    // /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar +    // pub(crate) async fn upsert_user_avatar( +    //     &self, +    //     jid: JID, +    //     avatar: String, +    // ) -> Result<(bool, Option<String>), Error> { +    //     #[derive(sqlx::FromRow)] +    //     struct AvatarRow { +    //         avatar: Option<String>, +    //     } +    //     let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?") +    //         .bind(jid.clone()) +    //         .fetch_optional(&self.db) +    //         .await? +    //         .map(|row: AvatarRow| row.avatar) +    //         .unwrap_or(None); +    //     if sqlx::query!( +    //         "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?", +    //         jid, +    //         avatar, +    //         avatar, +    //         avatar, +    //     ) +    //     .execute(&self.db) +    //     .await? +    //     .rows_affected() +    //         > 0 +    //     { +    //         Ok((true, old_avatar)) +    //     } else { +    //         Ok((false, old_avatar)) +    //     } +    // } + +    // pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> { +    //     sqlx::query!( +    //         "update users set nick = ? where jid = ?", +    //         user.nick, +    //         user.jid +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     Ok(()) +    // } + +    // // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them. +    // // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {} + +    // /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately +    // pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> { +    //     sqlx::query!( +    //         "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )", +    //         contact.user_jid, +    //         contact.name, +    //         contact.subscription +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     // TODO: abstract this out in to add_to_group() function ? +    //     for group in contact.groups { +    //         sqlx::query!( +    //             "insert into groups (group_name) values (?) on conflict do nothing", +    //             group +    //         ) +    //         .execute(&self.db) +    //         .await?; +    //         sqlx::query!( +    //             "insert into groups_roster (group_name, contact_jid) values (?, ?)", +    //             group, +    //             contact.user_jid +    //         ) +    //         .execute(&self.db) +    //         .await?; +    //     } +    //     Ok(()) +    // } + +    // pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> { +    //     let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?") +    //         .bind(contact) +    //         .fetch_one(&self.db) +    //         .await?; +    //     #[derive(sqlx::FromRow)] +    //     struct Row { +    //         group_name: String, +    //     } +    //     let groups: Vec<Row> = +    //         sqlx::query_as("select group_name from groups_roster where contact_jid = ?") +    //             .bind(&contact.user_jid) +    //             .fetch_all(&self.db) +    //             .await?; +    //     contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); +    //     Ok(contact) +    // } + +    // pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> { +    //     let contact: Option<Contact> = +    //         sqlx::query_as("select * from roster join users on jid = user_jid where jid = ?") +    //             .bind(contact) +    //             .fetch_optional(&self.db) +    //             .await?; +    //     if let Some(mut contact) = contact { +    //         #[derive(sqlx::FromRow)] +    //         struct Row { +    //             group_name: String, +    //         } +    //         let groups: Vec<Row> = +    //             sqlx::query_as("select group_name from groups_roster where contact_jid = ?") +    //                 .bind(&contact.user_jid) +    //                 .fetch_all(&self.db) +    //                 .await?; +    //         contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); +    //         Ok(Some(contact)) +    //     } else { +    //         Ok(None) +    //     } +    // } + +    // /// does not update the underlying user, to update user, update_user() must be called separately +    // pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> { +    //     sqlx::query!( +    //         "update roster set name = ?, subscription = ? where user_jid = ?", +    //         contact.name, +    //         contact.subscription, +    //         contact.user_jid +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     sqlx::query!( +    //         "delete from groups_roster where contact_jid = ?", +    //         contact.user_jid +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     // TODO: delete orphaned groups from groups table +    //     for group in contact.groups { +    //         sqlx::query!( +    //             "insert into groups (group_name) values (?) on conflict do nothing", +    //             group +    //         ) +    //         .execute(&self.db) +    //         .await?; +    //         sqlx::query!( +    //             "insert into groups_roster (group_name, contact_jid) values (?, ?)", +    //             group, +    //             contact.user_jid +    //         ) +    //         .execute(&self.db) +    //         .await?; +    //     } +    //     Ok(()) +    // } + +    // pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> { +    //     sqlx::query!( +    //         "insert into users ( jid ) values ( ? ) on conflict do nothing", +    //         contact.user_jid, +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     sqlx::query!( +    //         "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?", +    //         contact.user_jid, +    //         contact.name, +    //         contact.subscription, +    //         contact.name, +    //         contact.subscription +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     sqlx::query!( +    //         "delete from groups_roster where contact_jid = ?", +    //         contact.user_jid +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     // TODO: delete orphaned groups from groups table +    //     for group in contact.groups { +    //         sqlx::query!( +    //             "insert into groups (group_name) values (?) on conflict do nothing", +    //             group +    //         ) +    //         .execute(&self.db) +    //         .await?; +    //         sqlx::query!( +    //             "insert into groups_roster (group_name, contact_jid) values (?, ?)", +    //             group, +    //             contact.user_jid +    //         ) +    //         .execute(&self.db) +    //         .await?; +    //     } +    //     Ok(()) +    // } + +    // pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> { +    //     sqlx::query!("delete from roster where user_jid = ?", contact) +    //         .execute(&self.db) +    //         .await?; +    //     // TODO: delete orphaned groups from groups table +    //     Ok(()) +    // } + +    // pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> { +    //     sqlx::query!("delete from roster").execute(&self.db).await?; +    //     for contact in roster { +    //         self.upsert_contact(contact).await?; +    //     } +    //     Ok(()) +    // } + +    // pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> { +    //     let mut roster: Vec<Contact> = sqlx::query_as("select * from roster") +    //         .fetch_all(&self.db) +    //         .await?; +    //     for contact in &mut roster { +    //         #[derive(sqlx::FromRow)] +    //         struct Row { +    //             group_name: String, +    //         } +    //         let groups: Vec<Row> = +    //             sqlx::query_as("select group_name from groups_roster where contact_jid = ?") +    //                 .bind(&contact.user_jid) +    //                 .fetch_all(&self.db) +    //                 .await?; +    //         contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); +    //     } +    //     Ok(roster) +    // } + +    // pub(crate) async fn read_cached_roster_with_users( +    //     &self, +    // ) -> Result<Vec<(Contact, User)>, Error> { +    //     #[derive(sqlx::FromRow)] +    //     struct Row { +    //         #[sqlx(flatten)] +    //         contact: Contact, +    //         #[sqlx(flatten)] +    //         user: User, +    //     } +    //     let mut roster: Vec<Row> = +    //         sqlx::query_as("select * from roster join users on jid = user_jid") +    //             .fetch_all(&self.db) +    //             .await?; +    //     for row in &mut roster { +    //         #[derive(sqlx::FromRow)] +    //         struct Row { +    //             group_name: String, +    //         } +    //         let groups: Vec<Row> = +    //             sqlx::query_as("select group_name from groups_roster where contact_jid = ?") +    //                 .bind(&row.contact.user_jid) +    //                 .fetch_all(&self.db) +    //                 .await?; +    //         row.contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name)); +    //     } +    //     let roster = roster +    //         .into_iter() +    //         .map(|row| (row.contact, row.user)) +    //         .collect(); +    //     Ok(roster) +    // } + +    // pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> { +    //     let id = Uuid::new_v4(); +    //     let jid = chat.correspondent(); +    //     sqlx::query!( +    //         "insert into chats (id, correspondent, have_chatted) values (?, ?, ?)", +    //         id, +    //         jid, +    //         chat.have_chatted, +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     Ok(()) +    // } + +    // // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster. + +    // pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> { +    //     // check if the chat correponding with the jid exists +    //     let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?") +    //         .bind(chat) +    //         .fetch_one(&self.db) +    //         .await?; +    //     Ok(chat) +    // } + +    // pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { +    //     let jid = chat.as_bare(); +    //     sqlx::query!( +    //         "update chats set have_chatted = true where correspondent = ?", +    //         jid +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     Ok(()) +    // } + +    // pub(crate) async fn update_chat_correspondent( +    //     &self, +    //     old_chat: Chat, +    //     new_correspondent: JID, +    // ) -> Result<Chat, Error> { +    //     // TODO: update other chat data if it differs (for now there is only correspondent so doesn't matter) +    //     let new_jid = &new_correspondent; +    //     let old_jid = old_chat.correspondent(); +    //     sqlx::query!( +    //         "update chats set correspondent = ? where correspondent = ?", +    //         new_jid, +    //         old_jid, +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     let chat = self.read_chat(new_correspondent).await?; +    //     Ok(chat) +    // } + +    // // pub(crate) async fn update_chat + +    // pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> { +    //     sqlx::query!("delete from chats where correspondent = ?", chat) +    //         .execute(&self.db) +    //         .await?; +    //     Ok(()) +    // } + +    // /// TODO: sorting and filtering (for now there is no sorting) +    // pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> { +    //     let chats: Vec<Chat> = sqlx::query_as("select * from chats") +    //         .fetch_all(&self.db) +    //         .await?; +    //     Ok(chats) +    // } + +    // /// chats ordered by date of last message +    // // greatest-n-per-group +    // pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> { +    //     let chats = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc") +    //         .fetch_all(&self.db) +    //         .await?; +    //     Ok(chats) +    // } + +    // /// chats ordered by date of last message +    // // greatest-n-per-group +    // pub(crate) async fn read_chats_ordered_with_latest_messages( +    //     &self, +    // ) -> Result<Vec<(Chat, Message)>, Error> { +    //     #[derive(sqlx::FromRow)] +    //     pub struct RowChat { +    //         chat_correspondent: JID, +    //         chat_have_chatted: bool, +    //     } +    //     impl From<RowChat> for Chat { +    //         fn from(value: RowChat) -> Self { +    //             Self { +    //                 correspondent: value.chat_correspondent, +    //                 have_chatted: value.chat_have_chatted, +    //             } +    //         } +    //     } +    //     #[derive(sqlx::FromRow)] +    //     pub struct RowMessage { +    //         message_id: Uuid, +    //         message_body: String, +    //         message_delivery: Option<Delivery>, +    //         message_timestamp: DateTime<Utc>, +    //         message_from_jid: JID, +    //     } +    //     impl From<RowMessage> for Message { +    //         fn from(value: RowMessage) -> Self { +    //             Self { +    //                 id: value.message_id, +    //                 from: value.message_from_jid, +    //                 delivery: value.message_delivery, +    //                 timestamp: value.message_timestamp, +    //                 body: Body { +    //                     body: value.message_body, +    //                 }, +    //             } +    //         } +    //     } + +    //     #[derive(sqlx::FromRow)] +    //     pub struct ChatWithMessageRow { +    //         #[sqlx(flatten)] +    //         pub chat: RowChat, +    //         #[sqlx(flatten)] +    //         pub message: RowMessage, +    //     } + +    //     pub struct ChatWithMessage { +    //         chat: Chat, +    //         message: Message, +    //     } + +    //     impl From<ChatWithMessageRow> for ChatWithMessage { +    //         fn from(value: ChatWithMessageRow) -> Self { +    //             Self { +    //                 chat: value.chat.into(), +    //                 message: value.message.into(), +    //             } +    //         } +    //     } + +    //     // TODO: sometimes chats have no messages. +    //     let chats: Vec<ChatWithMessageRow> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc") +    //         .fetch_all(&self.db) +    //         .await?; + +    //     let chats = chats +    //         .into_iter() +    //         .map(|chat_with_message_row| { +    //             let chat_with_message: ChatWithMessage = chat_with_message_row.into(); +    //             (chat_with_message.chat, chat_with_message.message) +    //         }) +    //         .collect(); + +    //     Ok(chats) +    // } + +    // /// chats ordered by date of last message +    // // greatest-n-per-group +    // pub(crate) async fn read_chats_ordered_with_latest_messages_and_users( +    //     &self, +    // ) -> Result<Vec<((Chat, User), (Message, User))>, Error> { +    //     #[derive(sqlx::FromRow)] +    //     pub struct RowChat { +    //         chat_correspondent: JID, +    //         chat_have_chatted: bool, +    //     } +    //     impl From<RowChat> for Chat { +    //         fn from(value: RowChat) -> Self { +    //             Self { +    //                 correspondent: value.chat_correspondent, +    //                 have_chatted: value.chat_have_chatted, +    //             } +    //         } +    //     } +    //     #[derive(sqlx::FromRow)] +    //     pub struct RowMessage { +    //         message_id: Uuid, +    //         message_body: String, +    //         message_delivery: Option<Delivery>, +    //         message_timestamp: DateTime<Utc>, +    //         message_from_jid: JID, +    //     } +    //     impl From<RowMessage> for Message { +    //         fn from(value: RowMessage) -> Self { +    //             Self { +    //                 id: value.message_id, +    //                 from: value.message_from_jid, +    //                 delivery: value.message_delivery, +    //                 timestamp: value.message_timestamp, +    //                 body: Body { +    //                     body: value.message_body, +    //                 }, +    //             } +    //         } +    //     } +    //     #[derive(sqlx::FromRow)] +    //     pub struct RowChatUser { +    //         chat_user_jid: JID, +    //         chat_user_nick: Option<String>, +    //         chat_user_avatar: Option<String>, +    //     } +    //     impl From<RowChatUser> for User { +    //         fn from(value: RowChatUser) -> Self { +    //             Self { +    //                 jid: value.chat_user_jid, +    //                 nick: value.chat_user_nick, +    //                 avatar: value.chat_user_avatar, +    //             } +    //         } +    //     } +    //     #[derive(sqlx::FromRow)] +    //     pub struct RowMessageUser { +    //         message_user_jid: JID, +    //         message_user_nick: Option<String>, +    //         message_user_avatar: Option<String>, +    //     } +    //     impl From<RowMessageUser> for User { +    //         fn from(value: RowMessageUser) -> Self { +    //             Self { +    //                 jid: value.message_user_jid, +    //                 nick: value.message_user_nick, +    //                 avatar: value.message_user_avatar, +    //             } +    //         } +    //     } +    //     #[derive(sqlx::FromRow)] +    //     pub struct ChatWithMessageAndUsersRow { +    //         #[sqlx(flatten)] +    //         pub chat: RowChat, +    //         #[sqlx(flatten)] +    //         pub chat_user: RowChatUser, +    //         #[sqlx(flatten)] +    //         pub message: RowMessage, +    //         #[sqlx(flatten)] +    //         pub message_user: RowMessageUser, +    //     } + +    //     impl From<ChatWithMessageAndUsersRow> for ChatWithMessageAndUsers { +    //         fn from(value: ChatWithMessageAndUsersRow) -> Self { +    //             Self { +    //                 chat: value.chat.into(), +    //                 chat_user: value.chat_user.into(), +    //                 message: value.message.into(), +    //                 message_user: value.message_user.into(), +    //             } +    //         } +    //     } + +    //     pub struct ChatWithMessageAndUsers { +    //         chat: Chat, +    //         chat_user: User, +    //         message: Message, +    //         message_user: User, +    //     } + +    //     let chats: Vec<ChatWithMessageAndUsersRow> = sqlx::query_as("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc") +    //         .fetch_all(&self.db) +    //         .await?; + +    //     let chats = chats +    //         .into_iter() +    //         .map(|chat_with_message_and_users_row| { +    //             let chat_with_message_and_users: ChatWithMessageAndUsers = +    //                 chat_with_message_and_users_row.into(); +    //             ( +    //                 ( +    //                     chat_with_message_and_users.chat, +    //                     chat_with_message_and_users.chat_user, +    //                 ), +    //                 ( +    //                     chat_with_message_and_users.message, +    //                     chat_with_message_and_users.message_user, +    //                 ), +    //             ) +    //         }) +    //         .collect(); + +    //     Ok(chats) +    // } + +    // async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> { +    //     #[derive(sqlx::FromRow)] +    //     struct Row { +    //         id: Uuid, +    //     } +    //     let chat = chat.as_bare(); +    //     let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?") +    //         .bind(chat) +    //         .fetch_one(&self.db) +    //         .await?; +    //     let chat_id = chat_id.id; +    //     Ok(chat_id) +    // } + +    // async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> { +    //     #[derive(sqlx::FromRow)] +    //     struct Row { +    //         id: Uuid, +    //     } +    //     let chat_id: Option<Row> = sqlx::query_as("select id from chats where correspondent = ?") +    //         .bind(chat) +    //         .fetch_optional(&self.db) +    //         .await?; +    //     let chat_id = chat_id.map(|row| row.id); +    //     Ok(chat_id) +    // } + +    // /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. +    // pub(crate) async fn create_message( +    //     &self, +    //     message: Message, +    //     chat: JID, +    //     from: JID, +    // ) -> Result<(), Error> { +    //     // TODO: one query +    //     let from_jid = from.as_bare(); +    //     let chat_id = self.read_chat_id(chat).await?; +    //     sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, from_jid, from.resourcepart, message.timestamp).execute(&self.db).await?; +    //     Ok(()) +    // } + +    // pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> { +    //     let bare_chat = chat.as_bare(); +    //     sqlx::query!( +    //         "insert into users (jid) values (?) on conflict do nothing", +    //         bare_chat, +    //     ) +    //     .execute(&self.db) +    //     .await?; +    //     let id = Uuid::new_v4(); +    //     let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?") +    //         .bind(id) +    //         .bind(bare_chat.clone()) +    //         .bind(false) +    //         .bind(bare_chat) +    //         .fetch_one(&self.db) +    //         .await?; +    //     tracing::debug!("CHECKING chat: {:?}", chat); +    //     Ok(chat.have_chatted) +    // } + +    // /// MUST upsert chat beforehand +    // pub(crate) async fn create_message_with_self_resource( +    //     &self, +    //     message: Message, +    //     chat: JID, +    //     // full jid +    //     from: JID, +    // ) -> Result<(), Error> { +    //     let from_jid = from.as_bare(); +    //     if let Some(resource) = &from.resourcepart { +    //         sqlx::query!( +    //             "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", +    //             from_jid, +    //             resource +    //         ) +    //         .execute(&self.db) +    //         .await?; +    //     } +    //     self.create_message(message, chat, from).await?; +    //     Ok(()) +    // } + +    // /// create direct message from incoming. MUST upsert chat and user +    // pub(crate) async fn create_message_with_user_resource( +    //     &self, +    //     message: Message, +    //     chat: JID, +    //     // full jid +    //     from: JID, +    // ) -> Result<(), Error> { +    //     let bare_chat = chat.as_bare(); +    //     let resource = &chat.resourcepart; +    //     // sqlx::query!( +    //     //     "insert into users (jid) values (?) on conflict do nothing", +    //     //     bare_chat +    //     // ) +    //     // .execute(&self.db) +    //     // .await?; +    //     // let id = Uuid::new_v4(); +    //     // sqlx::query!( +    //     //     "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", +    //     //     id, +    //     //     bare_chat +    //     // ) +    //     // .execute(&self.db) +    //     // .await?; +    //     if let Some(resource) = resource { +    //         sqlx::query!( +    //             "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", +    //             bare_chat, +    //             resource +    //         ) +    //         .execute(&self.db) +    //         .await?; +    //     } +    //     self.create_message(message, chat, from).await?; +    //     Ok(()) +    // } + +    // pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> { +    //     let message: Message = sqlx::query_as("select * from messages where id = ?") +    //         .bind(message) +    //         .fetch_one(&self.db) +    //         .await?; +    //     Ok(message) +    // } + +    // // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {} + +    // pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> { +    //     sqlx::query!("delete from messages where id = ?", message) +    //         .execute(&self.db) +    //         .await?; +    //     Ok(()) +    // } + +    // // TODO: paging +    // pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> { +    //     let chat_id = self.read_chat_id(chat).await?; +    //     let messages: Vec<Message> = +    //         sqlx::query_as("select * from messages where chat_id = ? order by timestamp asc") +    //             .bind(chat_id) +    //             .fetch_all(&self.db) +    //             .await?; +    //     Ok(messages) +    // } + +    // pub(crate) async fn read_message_history_with_users( +    //     &self, +    //     chat: JID, +    // ) -> Result<Vec<(Message, User)>, Error> { +    //     let chat_id = self.read_chat_id(chat).await?; +    //     #[derive(sqlx::FromRow)] +    //     pub struct Row { +    //         #[sqlx(flatten)] +    //         user: User, +    //         #[sqlx(flatten)] +    //         message: Message, +    //     } +    //     let messages: Vec<Row> = +    //         sqlx::query_as("select * from messages join users on jid = from_jid where chat_id = ? order by timestamp asc") +    //             .bind(chat_id) +    //             .fetch_all(&self.db) +    //             .await?; +    //     let messages = messages +    //         .into_iter() +    //         .map(|row| (row.message, row.user)) +    //         .collect(); +    //     Ok(messages) +    // } + +    // pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> { +    //     let online: Online = sqlx::query_as("select * from cached_status where id = 0") +    //         .fetch_one(&self.db) +    //         .await?; +    //     Ok(online) +    // } + +    // pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { +    //     sqlx::query!( +    //         "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?", +    //         status.show, +    //         status.status, +    //         status.show, +    //         status.status +    //     ).execute(&self.db).await?; +    //     Ok(()) +    // } + +    // pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> { +    //     sqlx::query!("update cached_status set show = null, message = null where id = 0") +    //         .execute(&self.db) +    //         .await?; +    //     Ok(()) +    // } + +    // pub(crate) async fn read_capabilities(&self, node: &str) -> Result<String, Error> { +    //     #[derive(sqlx::FromRow)] +    //     struct Row { +    //         capabilities: String, +    //     } +    //     let row: Row = +    //         sqlx::query_as("select capabilities from capability_hash_nodes where node = ?") +    //             .bind(node) +    //             .fetch_one(&self.db) +    //             .await?; +    //     Ok(row.capabilities) +    // } + +    // pub(crate) async fn upsert_capabilities( +    //     &self, +    //     node: &str, +    //     capabilities: &str, +    // ) -> Result<(), Error> { +    //     let now = Utc::now(); +    //     sqlx::query!( +    //         "insert into capability_hash_nodes (node, timestamp, capabilities) values (?, ?, ?) on conflict do update set timestamp = ?, capabilities = ?", node, now, capabilities, now, capabilities +    //     ).execute(&self.db).await?; +    //     Ok(()) +    // }  } diff --git a/filamento/src/files.rs b/filamento/src/files.rs index 3acc871..644f883 100644 --- a/filamento/src/files.rs +++ b/filamento/src/files.rs @@ -1,10 +1,13 @@  use std::{ +    collections::HashMap, +    convert::Infallible,      error::Error,      path::{Path, PathBuf},      sync::Arc,  };  use tokio::io; +use tokio::sync::Mutex;  pub trait FileStore {      type Err: Clone + Send + Error; @@ -25,10 +28,47 @@ pub trait FileStore {  }  #[derive(Clone, Debug)] +pub struct FilesMem { +    files: Arc<Mutex<HashMap<String, Vec<u8>>>>, +} + +impl FilesMem { +    pub fn new() -> Self { +        Self { +            files: Arc::new(Mutex::new(HashMap::new())), +        } +    } +} + +impl FileStore for FilesMem { +    type Err = Infallible; + +    async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> { +        Ok(self.files.lock().await.contains_key(name)) +    } + +    async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> { +        self.files +            .lock() +            .await +            .insert(name.to_string(), data.to_owned()); + +        Ok(()) +    } + +    async fn delete(&self, name: &str) -> Result<(), Self::Err> { +        self.files.lock().await.remove(name); +        Ok(()) +    } +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Clone, Debug)]  pub struct Files {      root: PathBuf,  } +#[cfg(not(target_arch = "wasm32"))]  impl Files {      pub fn new(root: impl AsRef<Path>) -> Self {          let root = root.as_ref(); @@ -41,6 +81,7 @@ impl Files {      }  } +#[cfg(not(target_arch = "wasm32"))]  impl FileStore for Files {      type Err = Arc<io::Error>; diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index 14b0cae..e06f7c6 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -32,6 +32,8 @@ use tokio::{      sync::{Mutex, mpsc, oneshot},      time::timeout,  }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio;  use tracing::{debug, info};  use user::User;  use uuid::Uuid; @@ -258,6 +260,7 @@ impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> {          let actor: CoreClient<ClientLogic<Fs>> =              CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); +          tokio::spawn(async move { actor.run().await });          (client, update_recv) @@ -728,3 +731,93 @@ impl<Fs: FileStore> From<Command<Fs>> for CoreClientCommand<Command<Fs>> {          CoreClientCommand::Command(value)      }  } + +#[cfg(test)] +mod tests { +    use wasm_bindgen_test::*; + +    use super::*; + +    wasm_bindgen_test_configure!(run_in_browser); + +    use crate::chat; +    use crate::files::FilesMem; +    use std::path::Path; +    use tokio_with_wasm::alias as tokio; + +    #[wasm_bindgen_test] +    async fn login() -> () { +        tracing_wasm::set_as_global_default(); +        let db = Db::create_connect_and_migrate(Path::new("./filamento.db")) +            .await +            .unwrap(); +        let (client, mut recv) = Client::new( +            "test@blos.sm/testing2".try_into().unwrap(), +            "slayed".to_string(), +            db, +            FilesMem::new(), +        ); + +        tokio::spawn(async move { +            while let Some(msg) = recv.recv().await { +                info!("{:#?}", msg) +            } +        }); + +        client.connect().await.unwrap(); +        // tokio::time::sleep(Duration::from_secs(5)).await; +        info!("changing nick"); +        client +            .change_nick(Some("britney".to_string())) +            .await +            .unwrap(); +        let mut data = include_bytes!("../files/britney_starbies.jpg"); +        client.change_avatar(Some(data.to_vec())).await.unwrap(); +        info!("sending message"); +        client +            .send_message( +                JID::from_str("cel@blos.sm").unwrap(), +                chat::Body { +                    body: "hallo!!!".to_string(), +                }, +            ) +            .await +            .unwrap(); +        info!("sent message"); +        client +            .send_message( +                JID::from_str("cel@blos.sm").unwrap(), +                chat::Body { +                    body: "hallo 2".to_string(), +                }, +            ) +            .await +            .unwrap(); +        // tokio::time::sleep(Duration::from_secs(15)).await; +        // info!("sending disco query"); +        // let info = client.disco_info(None, None).await.unwrap(); +        // info!("got disco result: {:#?}", info); +        // let items = client.disco_items(None, None).await.unwrap(); +        // info!("got disco result: {:#?}", items); +        // let info = client +        //     .disco_info(Some("blos.sm".parse().unwrap()), None) +        //     .await +        //     .unwrap(); +        // info!("got disco result: {:#?}", info); +        // let items = client +        //     .disco_items(Some("blos.sm".parse().unwrap()), None) +        //     .await +        //     .unwrap(); +        // info!("got disco result: {:#?}", items); +        // let info = client +        //     .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None) +        //     .await +        //     .unwrap(); +        // info!("got disco result: {:#?}", info); +        // let items = client +        //     .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None) +        //     .await +        //     .unwrap(); +        // info!("got disco result: {:#?}", items);        let mut jid: JID = "test@blos.sm".try_into().unwrap(); +    } +} diff --git a/jid/Cargo.toml b/jid/Cargo.toml index d5983ea..57421ef 100644 --- a/jid/Cargo.toml +++ b/jid/Cargo.toml @@ -6,5 +6,9 @@ edition = "2021"  [features]  sqlx = ["dep:sqlx"] -[dependencies] -sqlx = { version = "0.8.3", features = ["sqlite"], optional = true } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +# sqlx = { version = "0.8.3", features = ["sqlite"], optional = true } +sqlx = { path = "../../remote/sqlx", features = ["sqlite"], optional = true } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +sqlx = { path = "../../remote/sqlx", features = ["sqlite-precompiled-wasm"], optional = true } diff --git a/lampada/Cargo.toml b/lampada/Cargo.toml index 22f133d..090aa9f 100644 --- a/lampada/Cargo.toml +++ b/lampada/Cargo.toml @@ -9,13 +9,13 @@ luz = { version = "0.1.0", path = "../luz" }  peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" }  jid = { version = "0.1.0", path = "../jid" }  stanza = { version = "0.1.0", path = "../stanza", features = ["xep_0203"] } -tokio = { version = "1.42.0", features = ["macros"] } +tokio = { workspace = true, features = ["macros", "sync"] }  tracing = "0.1.41"  thiserror = "2.0.11"  [target.'cfg(target_arch = "wasm32")'.dependencies] -tokio = { version = "1.42.0", features = ["macros", "rt", "time"] } -wasm-bindgen-futures = "0.4" +tokio = { workspace = true, features = ["macros", "rt", "time", "sync"] } +tokio_with_wasm = { version = "0.8.2", features = ["macros", "rt", "time", "sync"] }  # [target.'cfg(not(target_arch = "wasm32"))'.dependencies]  # tokio = { version = "1.42.0", features = ["rt-multi-thread"] } diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index a3dde16..3a3187f 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -15,6 +15,8 @@ use tokio::{      sync::{mpsc, oneshot, Mutex},      task::{JoinHandle, JoinSet},  }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio;  use tracing::info;  use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; @@ -46,7 +48,12 @@ pub enum SupervisorCommand {      Reconnect(ReadState),  } -impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { +impl<Lgc> Supervisor<Lgc> +where +    Lgc: Logic + Clone + 'static, +    #[cfg(not(target_arch = "wasm32"))] +    Lgc: Send, +{      fn new(          command_recv: mpsc::Receiver<SupervisorCommand>,          reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>, @@ -129,7 +136,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {                                      self.reader_crash = recv;                                      self.read_control_handle = ReadControlHandle::reconnect(                                          read, -                                        read_state.tasks, +                                        // read_state.tasks,                                          self.connected.clone(),                                          self.logic.clone(),                                          read_state.supervisor_control, @@ -177,7 +184,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {                              self.reader_crash = recv;                              self.read_control_handle = ReadControlHandle::reconnect(                                  read, -                                read_state.tasks, +                                // read_state.tasks,                                  self.connected.clone(),                                  self.logic.clone(),                                  read_state.supervisor_control, @@ -225,7 +232,7 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {                              self.reader_crash = recv;                              self.read_control_handle = ReadControlHandle::reconnect(                                  read, -                                read_state.tasks, +                                // read_state.tasks,                                  self.connected.clone(),                                  self.logic.clone(),                                  read_state.supervisor_control, @@ -295,14 +302,19 @@ impl DerefMut for SupervisorSender {  }  impl SupervisorHandle { -    pub fn new<Lgc: Logic + Clone + Send + 'static>( +    pub fn new<Lgc>(          streams: BoundJabberStream,          on_crash: oneshot::Sender<()>,          jid: JID,          server: JID,          password: Arc<String>,          logic: Lgc, -    ) -> (WriteHandle, Self) { +    ) -> (WriteHandle, Self) +    where +        Lgc: Logic + Clone + 'static, +        #[cfg(not(target_arch = "wasm32"))] +        Lgc: Send, +    {          let (command_send, command_recv) = mpsc::channel(20);          let (writer_crash_send, writer_crash_recv) = oneshot::channel();          let (reader_crash_send, reader_crash_recv) = oneshot::channel(); @@ -342,9 +354,6 @@ impl SupervisorHandle {              logic,          ); -        #[cfg(target_arch = "wasm32")] -        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); -        #[cfg(not(target_arch = "wasm32"))]          tokio::spawn(async move { actor.run().await });          ( diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index 4451d99..591a2cb 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -16,6 +16,8 @@ use tokio::{      sync::{mpsc, oneshot, Mutex},      task::{JoinHandle, JoinSet},  }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio;  use tracing::info;  use crate::{Connected, Logic, WriteMessage}; @@ -29,7 +31,7 @@ pub struct Read<Lgc> {      disconnect_timedout: Fuse<oneshot::Receiver<()>>,      // all the threads spawned by the current connection session -    tasks: JoinSet<()>, +    // tasks: JoinSet<()>,      // for handling incoming stanzas      // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) @@ -46,13 +48,13 @@ pub struct Read<Lgc> {  pub struct ReadState {      pub supervisor_control: SupervisorSender,      // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream -    pub tasks: JoinSet<()>, +    // pub tasks: JoinSet<()>,  }  impl<Lgc> Read<Lgc> {      fn new(          stream: BoundJabberReader, -        tasks: JoinSet<()>, +        // tasks: JoinSet<()>,          connected: Connected,          logic: Lgc,          supervisor_control: SupervisorSender, @@ -64,7 +66,7 @@ impl<Lgc> Read<Lgc> {              stream,              disconnecting: false,              disconnect_timedout: recv.fuse(), -            tasks, +            // tasks,              connected,              logic,              supervisor_control, @@ -74,7 +76,12 @@ impl<Lgc> Read<Lgc> {      }  } -impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { +impl<Lgc> Read<Lgc> +where +    Lgc: Logic + Clone + 'static, +    #[cfg(not(target_arch = "wasm32"))] +    Lgc: Send, +{      async fn run(mut self) {          println!("started read thread");          // let stanza = self.stream.read::<Stanza>().await; @@ -100,7 +107,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {                              })                          },                          ReadControl::Abort(sender) => { -                            let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); +                            let _ = sender.send(ReadState { supervisor_control: self.supervisor_control });                              break;                          },                      }; @@ -112,11 +119,11 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {                              match s {                                  Stanza::Error(error) => {                                      self.logic.clone().handle_stream_error(error).await; -                                    self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await; +                                    self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone() })).await;                                      break;                                  },                                  _ => { -                                    self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); +                                    tokio::spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));                                  }                              };                          }, @@ -143,7 +150,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {                                      _ => None,                                  }; -                                let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); +                                let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control }));                              }                              break;                          }, @@ -189,27 +196,29 @@ impl DerefMut for ReadControlHandle {  }  impl ReadControlHandle { -    pub fn new<Lgc: Clone + Logic + Send + 'static>( +    pub fn new<Lgc>(          stream: BoundJabberReader,          connected: Connected,          logic: Lgc,          supervisor_control: SupervisorSender,          on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, -    ) -> Self { +    ) -> Self +    where +        Lgc: Logic + Clone + 'static, +        #[cfg(not(target_arch = "wasm32"))] +        Lgc: Send, +    {          let (control_sender, control_receiver) = mpsc::channel(20);          let actor = Read::new(              stream, -            JoinSet::new(), +            // JoinSet::new(),              connected,              logic,              supervisor_control,              control_receiver,              on_crash,          ); -        #[cfg(target_arch = "wasm32")] -        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); -        #[cfg(not(target_arch = "wasm32"))]          tokio::spawn(async move { actor.run().await });          Self { @@ -217,28 +226,30 @@ impl ReadControlHandle {          }      } -    pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( +    pub fn reconnect<Lgc>(          stream: BoundJabberReader, -        tasks: JoinSet<()>, +        // tasks: JoinSet<()>,          connected: Connected,          logic: Lgc,          supervisor_control: SupervisorSender,          on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, -    ) -> Self { +    ) -> Self +    where +        Lgc: Logic + Clone + 'static, +        #[cfg(not(target_arch = "wasm32"))] +        Lgc: Send, +    {          let (control_sender, control_receiver) = mpsc::channel(20);          let actor = Read::new(              stream, -            tasks, +            // tasks,              connected,              logic,              supervisor_control,              control_receiver,              on_crash,          ); -        #[cfg(target_arch = "wasm32")] -        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); -        #[cfg(not(target_arch = "wasm32"))]          tokio::spawn(async move { actor.run().await });          Self { diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs index 4c6ed24..b982eea 100644 --- a/lampada/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -8,6 +8,8 @@ use tokio::{      sync::{mpsc, oneshot},      task::JoinHandle,  }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio;  use crate::error::WriteError; @@ -218,9 +220,6 @@ impl WriteControlHandle {          let (stanza_sender, stanza_receiver) = mpsc::channel(20);          let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); -        #[cfg(target_arch = "wasm32")] -        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); -        #[cfg(not(target_arch = "wasm32"))]          tokio::spawn(async move { actor.run().await });          ( @@ -242,9 +241,6 @@ impl WriteControlHandle {          let (control_sender, control_receiver) = mpsc::channel(20);          let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); -        #[cfg(target_arch = "wasm32")] -        wasm_bindgen_futures::spawn_local(async move { actor.run_reconnected(retry_msg).await }); -        #[cfg(not(target_arch = "wasm32"))]          tokio::spawn(async move { actor.run_reconnected(retry_msg).await });          Self { @@ -261,9 +257,6 @@ impl WriteControlHandle {          let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); -        #[cfg(target_arch = "wasm32")] -        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); -        #[cfg(not(target_arch = "wasm32"))]          tokio::spawn(async move { actor.run().await });          Self { diff --git a/lampada/src/error.rs b/lampada/src/error.rs index 8104155..40be012 100644 --- a/lampada/src/error.rs +++ b/lampada/src/error.rs @@ -1,11 +1,14 @@  use std::sync::Arc; +#[cfg(not(target_arch = "wasm32"))] +use ::tokio::time::error::Elapsed;  use stanza::client::Stanza;  use thiserror::Error; -use tokio::{ -    sync::{mpsc::error::SendError, oneshot::error::RecvError}, -    time::error::Elapsed, -}; +use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError}; +#[cfg(target_arch = "wasm32")] +use tokio::time::Elapsed; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio;  #[derive(Debug, Error, Clone)]  pub enum ConnectionError { diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs index 7346c42..dacc56d 100644 --- a/lampada/src/lib.rs +++ b/lampada/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(where_clause_attrs)] +  use std::{      collections::HashMap,      ops::{Deref, DerefMut}, @@ -21,6 +23,8 @@ use tokio::{      task::JoinSet,      time::timeout,  }; +#[cfg(target_arch = "wasm32")] +use tokio_with_wasm::alias as tokio;  use tracing::{debug, info};  use crate::connection::write::WriteHandle; @@ -58,20 +62,24 @@ pub trait Logic {      type Cmd;      /// run after binding to the stream (e.g. for a chat client, ) +    #[cfg(not(target_arch = "wasm32"))]      fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;      /// run before closing the stream (e.g. send unavailable presence in a chat client) +    #[cfg(not(target_arch = "wasm32"))]      fn handle_disconnect(          self,          connection: Connected,      ) -> impl std::future::Future<Output = ()> + Send; +    #[cfg(not(target_arch = "wasm32"))]      fn handle_stream_error(          self,          stream_error: StreamError,      ) -> impl std::future::Future<Output = ()> + Send;      /// run to handle an incoming xmpp stanza +    #[cfg(not(target_arch = "wasm32"))]      fn handle_stanza(          self,          stanza: Stanza, @@ -79,6 +87,7 @@ pub trait Logic {      ) -> impl std::future::Future<Output = ()> + std::marker::Send;      /// run to handle a command message when a connection is currently established +    #[cfg(not(target_arch = "wasm32"))]      fn handle_online(          self,          command: Self::Cmd, @@ -86,21 +95,67 @@ pub trait Logic {      ) -> impl std::future::Future<Output = ()> + std::marker::Send;      /// run to handle a command message when disconnected +    #[cfg(not(target_arch = "wasm32"))]      fn handle_offline(          self,          command: Self::Cmd,      ) -> impl std::future::Future<Output = ()> + std::marker::Send;      /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) +    #[cfg(not(target_arch = "wasm32"))]      fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;      /// handle connection errors from the core client logic +    #[cfg(not(target_arch = "wasm32"))]      fn handle_connection_error(          self,          error: ConnectionError,      ) -> impl std::future::Future<Output = ()> + std::marker::Send;      // async fn handle_stream_error(self, error) {} +    #[cfg(target_arch = "wasm32")] +    fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()>; + +    /// run before closing the stream (e.g. send unavailable presence in a chat client) +    #[cfg(target_arch = "wasm32")] +    fn handle_disconnect(self, connection: Connected) -> impl std::future::Future<Output = ()>; + +    #[cfg(target_arch = "wasm32")] +    fn handle_stream_error( +        self, +        stream_error: StreamError, +    ) -> impl std::future::Future<Output = ()>; + +    /// run to handle an incoming xmpp stanza +    #[cfg(target_arch = "wasm32")] +    fn handle_stanza( +        self, +        stanza: Stanza, +        connection: Connected, +    ) -> impl std::future::Future<Output = ()>; + +    /// run to handle a command message when a connection is currently established +    #[cfg(target_arch = "wasm32")] +    fn handle_online( +        self, +        command: Self::Cmd, +        connection: Connected, +    ) -> impl std::future::Future<Output = ()>; + +    /// run to handle a command message when disconnected +    #[cfg(target_arch = "wasm32")] +    fn handle_offline(self, command: Self::Cmd) -> impl std::future::Future<Output = ()>; + +    /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error) +    #[cfg(target_arch = "wasm32")] +    fn on_abort(self) -> impl std::future::Future<Output = ()>; + +    /// handle connection errors from the core client logic +    #[cfg(target_arch = "wasm32")] +    fn handle_connection_error( +        self, +        error: ConnectionError, +    ) -> impl std::future::Future<Output = ()>;  }  /// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core @@ -117,10 +172,15 @@ pub struct CoreClient<Lgc: Logic> {      logic: Lgc,      // config: LampConfig,      // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway? -    tasks: JoinSet<()>, +    // tasks: JoinSet<()>,  } -impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { +impl<Lgc> CoreClient<Lgc> +where +    Lgc: Logic + Clone + 'static, +    #[cfg(not(target_arch = "wasm32"))] +    Lgc: Send, +{      /// create a new actor      pub fn new(          jid: JID, @@ -137,7 +197,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {              receiver,              connection_supervisor_shutdown,              logic, -            tasks: JoinSet::new(), +            // tasks: JoinSet::new(),          }      } @@ -240,10 +300,10 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {                  },                  CoreClientCommand::Command(command) => {                      match self.connected.as_ref() { -                        Some((w, s)) => self -                            .tasks -                            .spawn(self.logic.clone().handle_online(command, w.clone())), -                        None => self.tasks.spawn(self.logic.clone().handle_offline(command)), +                        Some((w, s)) => { +                            tokio::spawn(self.logic.clone().handle_online(command, w.clone())) +                        } +                        None => tokio::spawn(self.logic.clone().handle_offline(command)),                      };                  }              } diff --git a/luz/Cargo.toml b/luz/Cargo.toml index 709bc00..51c6936 100644 --- a/luz/Cargo.toml +++ b/luz/Cargo.toml @@ -21,7 +21,7 @@ rsasl = { version = "2.0.1", default_features = false, features = [  	"config_builder",  	"scram-sha-1",  ] } -tokio = { version = "1.28", features = [] } +tokio = { workspace = true, features = ["io-util"] }  tracing = "0.1.40"  try_map = "0.3.1"  stanza = { version = "0.1.0", path = "../stanza" } @@ -35,6 +35,7 @@ pin-project = "1.1.7"  thiserror = "2.0.11"  [target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { workspace = true, features = ["io-util", "sync"] }  uuid = { version = "1.13.1", features = ["js", "v4"] }  getrandom = { version = "0.2.15", features = ["js"] }  stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_7395"] } @@ -42,13 +43,14 @@ web-sys = { version = "0.3", features = ["Request", "WebSocket"] }  wasm-bindgen = "0.2"  [target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { workspace = true, features = ["io-util", "sync"] }  tokio-native-tls = "0.3.1"  trust-dns-resolver = "0.22.0"  [dev-dependencies]  tracing-wasm = "0.2.1"  wasm-bindgen-test = "0.3.0" -tokio = { version = "1.28", features = ["macros", "rt", "time"] } +tokio = { workspace = true, features = ["macros", "rt", "time"] }  test-log = { version = "0.2", features = ["trace"] }  env_logger = "*"  tracing-subscriber = { version = "0.3", default-features = false, features = [ | 
