aboutsummaryrefslogtreecommitdiffstats
path: root/filamento
diff options
context:
space:
mode:
Diffstat (limited to 'filamento')
-rw-r--r--filamento/.cargo/config.toml19
-rw-r--r--filamento/.gitignore2
-rw-r--r--filamento/Cargo.toml77
-rw-r--r--filamento/examples/example.rs224
-rw-r--r--filamento/files/4ff6fc2961a36a9439eea239f01e05624f3afbe0bin0 -> 8623 bytes
-rw-r--r--filamento/files/8d7231c2d8c4618fa71979851687737e63ba6377bin0 -> 6364 bytes
-rw-r--r--filamento/files/britney_starbies.jpgbin0 -> 11674 bytes
-rw-r--r--filamento/migrations/1.sql (renamed from filamento/migrations/20240113011930_luz.sql)34
-rw-r--r--filamento/src/caps.rs141
-rw-r--r--filamento/src/chat.rs101
-rw-r--r--filamento/src/db.rs1965
-rw-r--r--filamento/src/error.rs269
-rw-r--r--filamento/src/files.rs70
-rw-r--r--filamento/src/files/opfs.rs128
-rw-r--r--filamento/src/lib.rs310
-rw-r--r--filamento/src/logic/connect.rs4
-rw-r--r--filamento/src/logic/local_only.rs40
-rw-r--r--filamento/src/logic/mod.rs10
-rw-r--r--filamento/src/logic/offline.rs62
-rw-r--r--filamento/src/logic/online.rs258
-rw-r--r--filamento/src/logic/process_stanza.rs203
-rw-r--r--filamento/src/presence.rs59
-rw-r--r--filamento/src/roster.rs120
-rw-r--r--filamento/src/user.rs10
24 files changed, 3071 insertions, 1035 deletions
diff --git a/filamento/.cargo/config.toml b/filamento/.cargo/config.toml
new file mode 100644
index 0000000..319101a
--- /dev/null
+++ b/filamento/.cargo/config.toml
@@ -0,0 +1,19 @@
+[build]
+rustflags = [
+ # LLD (shipped with the Rust toolchain) is used as the default linker
+ # "-C", "link-arg=-Tlink.x",
+
+ # if you run into problems with LLD switch to the GNU linker by commenting out
+ # this line
+ # "-C", "linker=arm-none-eabi-ld",
+
+ # if you need to link to pre-compiled C libraries provided by a C toolchain
+ # use GCC as the linker by commenting out both lines above and then
+ # uncommenting the three lines below
+ "-C",
+ "target-feature=+atomics,+bulk-memory,+mutable-globals",
+]
+
+[unstable]
+build-std = ["std", "panic_abort"]
+
diff --git a/filamento/.gitignore b/filamento/.gitignore
index 1ba9f2a..52acf71 100644
--- a/filamento/.gitignore
+++ b/filamento/.gitignore
@@ -1,3 +1,3 @@
filamento.db
-files/
+./files/
.sqlx/
diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml
index 91b7e91..b89c577 100644
--- a/filamento/Cargo.toml
+++ b/filamento/Cargo.toml
@@ -3,28 +3,81 @@ name = "filamento"
version = "0.1.0"
edition = "2024"
+[features]
+serde = [
+ "dep:serde",
+ "jid/serde",
+ "uuid/serde",
+ "chrono/serde",
+ "lampada/serde",
+]
+opfs = ["dep:web-sys"]
+reactive_stores = ["dep:reactive_stores"]
+
[dependencies]
-futures = "0.3.31"
-lampada = { version = "0.1.0", path = "../lampada" }
-tokio = "1.42.0"
-thiserror = "2.0.11"
-stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172", "xep_0390", "xep_0128", "xep_0115", "xep_0084"] }
-sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
+futures = { workspace = true }
+lampada = { workspace = true }
+thiserror = { workspace = true }
+stanza = { workspace = true, features = [
+ "rfc_6121",
+ "xep_0203",
+ "xep_0030",
+ "xep_0060",
+ "xep_0172",
+ "xep_0390",
+ "xep_0128",
+ "xep_0115",
+ "xep_0084",
+] }
# TODO: re-export jid?
-jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
-uuid = { version = "1.13.1", features = ["v4"] }
-tracing = "0.1.41"
-chrono = "0.4.40"
+jid = { workspace = true, features = ["rusqlite"] }
+uuid = { workspace = true, features = ["v4"] }
+rusqlite = { git = "https://github.com/Spxg/rusqlite.git", branch = "wasm-demo", features = [
+ "uuid",
+ "chrono",
+] }
+tracing = { workspace = true }
+chrono = { workspace = true }
+serde = { workspace = true, features = ["derive"], optional = true }
sha2 = "0.10.8"
sha3 = "0.10.8"
base64 = "0.22.1"
sha1 = "0.10.6"
image = "0.25.6"
hex = "0.4.3"
+reactive_stores = { version = "0.2.2", optional = true }
+
+[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
+tokio = { workspace = true, features = ["sync", "time", "rt", "fs", "io-std"] }
+
+[target.'cfg(target_arch = "wasm32")'.dependencies]
+tokio = { workspace = true, features = ["sync", "time", "rt"] }
+js-sys.workspace = true
+web-sys = { workspace = true, features = [
+ "FileSystemDirectoryHandle",
+ "FileSystemWritableFileStream",
+ "FileSystemGetDirectoryOptions",
+ "FileSystemFileHandle",
+ "StorageManager",
+ "File",
+ "Url",
+ "Window",
+ "Navigator",
+], optional = true }
+wasm-bindgen = { workspace = true }
+wasm-bindgen-futures = { workspace = true }
+rusqlite = { git = "https://github.com/Spxg/rusqlite.git", branch = "wasm-demo", features = [
+ "uuid",
+ "chrono",
+ "precompiled-wasm",
+] }
+tokio_with_wasm = { workspace = true, features = ["sync", "time", "rt"] }
[dev-dependencies]
-tracing-subscriber = "0.3.19"
-peanuts = { version = "0.1.0", git = "https://bunny.garden/peanuts" }
+tracing-subscriber = { workspace = true }
+peanuts = { workspace = true }
+tracing-wasm = { workspace = true }
+wasm-bindgen-test = { workspace = true }
[[example]]
name = "example"
diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs
index 8119743..65fe166 100644
--- a/filamento/examples/example.rs
+++ b/filamento/examples/example.rs
@@ -1,120 +1,122 @@
-use std::{path::Path, str::FromStr, sync::Arc, time::Duration};
+// use std::{path::Path, str::FromStr, sync::Arc, time::Duration};
-use filamento::{Client, db::Db, files::FileStore};
-use jid::JID;
-use tokio::io::{self, AsyncReadExt};
-use tracing::info;
+// use filamento::{Client, db::Db, files::FileStore};
+// use jid::JID;
+// use tokio::io::{self, AsyncReadExt};
+// use tracing::info;
-#[derive(Clone, Debug)]
-pub struct Files;
+// #[derive(Clone, Debug)]
+// pub struct Files;
-impl FileStore for Files {
- type Err = Arc<io::Error>;
+// impl FileStore for Files {
+// type Err = Arc<io::Error>;
- async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> {
- tracing::debug!("checking if {} is stored", name);
- let res = tokio::fs::try_exists(format!("files/{}", name))
- .await
- .map_err(|err| Arc::new(err));
- tracing::debug!("file check res: {:?}", res);
- res
- }
+// async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> {
+// tracing::debug!("checking if {} is stored", name);
+// let res = tokio::fs::try_exists(format!("files/{}", name))
+// .await
+// .map_err(|err| Arc::new(err));
+// tracing::debug!("file check res: {:?}", res);
+// res
+// }
- async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> {
- tracing::debug!("storing {} is stored", name);
- let res = tokio::fs::write(format!("files/{}", name), data)
- .await
- .map_err(|err| Arc::new(err));
- tracing::debug!("file store res: {:?}", res);
- res
- }
+// async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> {
+// tracing::debug!("storing {} is stored", name);
+// let res = tokio::fs::write(format!("files/{}", name), data)
+// .await
+// .map_err(|err| Arc::new(err));
+// tracing::debug!("file store res: {:?}", res);
+// res
+// }
- async fn delete(&self, name: &str) -> Result<(), Self::Err> {
- tracing::debug!("deleting {}", name);
- let res = tokio::fs::remove_file(format!("files/{}", name))
- .await
- .map_err(|err| Arc::new(err));
- tracing::debug!("file delete res: {:?}", res);
- res
- }
-}
+// async fn delete(&self, name: &str) -> Result<(), Self::Err> {
+// tracing::debug!("deleting {}", name);
+// let res = tokio::fs::remove_file(format!("files/{}", name))
+// .await
+// .map_err(|err| Arc::new(err));
+// tracing::debug!("file delete res: {:?}", res);
+// res
+// }
+// }
-#[tokio::main]
-async fn main() {
- tracing_subscriber::fmt::init();
- let db = Db::create_connect_and_migrate(Path::new("./filamento.db"))
- .await
- .unwrap();
- let (client, mut recv) = Client::new(
- "test@blos.sm/testing2".try_into().unwrap(),
- "slayed".to_string(),
- db,
- Files,
- );
+// #[tokio::main]
+// async fn main() {
+// tracing_subscriber::fmt::init();
+// let db = Db::create_connect_and_migrate(Path::new("./filamento.db"))
+// .await
+// .unwrap();
+// let (client, mut recv) = Client::new(
+// "test@blos.sm/testing2".try_into().unwrap(),
+// "slayed".to_string(),
+// db,
+// Files,
+// );
- tokio::spawn(async move {
- while let Some(msg) = recv.recv().await {
- info!("{:#?}", msg)
- }
- });
+// tokio::spawn(async move {
+// while let Some(msg) = recv.recv().await {
+// info!("{:#?}", msg)
+// }
+// });
- client.connect().await.unwrap();
- tokio::time::sleep(Duration::from_secs(5)).await;
- info!("changing nick");
- client
- .change_nick(Some("britney".to_string()))
- .await
- .unwrap();
- let mut profile_pic = tokio::fs::File::open("files/britney_starbies.jpg")
- .await
- .unwrap();
- let mut data = Vec::new();
- profile_pic.read_to_end(&mut data).await.unwrap();
- client.change_avatar(Some(data)).await.unwrap();
- info!("sending message");
- client
- .send_message(
- JID::from_str("cel@blos.sm").unwrap(),
- filamento::chat::Body {
- body: "hallo!!!".to_string(),
- },
- )
- .await
- .unwrap();
- info!("sent message");
- client
- .send_message(
- JID::from_str("cel@blos.sm").unwrap(),
- filamento::chat::Body {
- body: "hallo 2".to_string(),
- },
- )
- .await
- .unwrap();
- tokio::time::sleep(Duration::from_secs(15)).await;
- // info!("sending disco query");
- // let info = client.disco_info(None, None).await.unwrap();
- // info!("got disco result: {:#?}", info);
- // let items = client.disco_items(None, None).await.unwrap();
- // info!("got disco result: {:#?}", items);
- // let info = client
- // .disco_info(Some("blos.sm".parse().unwrap()), None)
- // .await
- // .unwrap();
- // info!("got disco result: {:#?}", info);
- // let items = client
- // .disco_items(Some("blos.sm".parse().unwrap()), None)
- // .await
- // .unwrap();
- // info!("got disco result: {:#?}", items);
- // let info = client
- // .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None)
- // .await
- // .unwrap();
- // info!("got disco result: {:#?}", info);
- // let items = client
- // .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None)
- // .await
- // .unwrap();
- // info!("got disco result: {:#?}", items);
-}
+// client.connect().await.unwrap();
+// tokio::time::sleep(Duration::from_secs(5)).await;
+// info!("changing nick");
+// client
+// .change_nick(Some("britney".to_string()))
+// .await
+// .unwrap();
+// let mut profile_pic = tokio::fs::File::open("files/britney_starbies.jpg")
+// .await
+// .unwrap();
+// let mut data = Vec::new();
+// profile_pic.read_to_end(&mut data).await.unwrap();
+// client.change_avatar(Some(data)).await.unwrap();
+// info!("sending message");
+// client
+// .send_message(
+// JID::from_str("cel@blos.sm").unwrap(),
+// filamento::chat::Body {
+// body: "hallo!!!".to_string(),
+// },
+// )
+// .await
+// .unwrap();
+// info!("sent message");
+// client
+// .send_message(
+// JID::from_str("cel@blos.sm").unwrap(),
+// filamento::chat::Body {
+// body: "hallo 2".to_string(),
+// },
+// )
+// .await
+// .unwrap();
+// tokio::time::sleep(Duration::from_secs(15)).await;
+// // info!("sending disco query");
+// // let info = client.disco_info(None, None).await.unwrap();
+// // info!("got disco result: {:#?}", info);
+// // let items = client.disco_items(None, None).await.unwrap();
+// // info!("got disco result: {:#?}", items);
+// // let info = client
+// // .disco_info(Some("blos.sm".parse().unwrap()), None)
+// // .await
+// // .unwrap();
+// // info!("got disco result: {:#?}", info);
+// // let items = client
+// // .disco_items(Some("blos.sm".parse().unwrap()), None)
+// // .await
+// // .unwrap();
+// // info!("got disco result: {:#?}", items);
+// // let info = client
+// // .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None)
+// // .await
+// // .unwrap();
+// // info!("got disco result: {:#?}", info);
+// // let items = client
+// // .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None)
+// // .await
+// // .unwrap();
+// // info!("got disco result: {:#?}", items);
+// }
+
+fn main() {}
diff --git a/filamento/files/4ff6fc2961a36a9439eea239f01e05624f3afbe0 b/filamento/files/4ff6fc2961a36a9439eea239f01e05624f3afbe0
new file mode 100644
index 0000000..0b27b91
--- /dev/null
+++ b/filamento/files/4ff6fc2961a36a9439eea239f01e05624f3afbe0
Binary files differ
diff --git a/filamento/files/8d7231c2d8c4618fa71979851687737e63ba6377 b/filamento/files/8d7231c2d8c4618fa71979851687737e63ba6377
new file mode 100644
index 0000000..4284eb6
--- /dev/null
+++ b/filamento/files/8d7231c2d8c4618fa71979851687737e63ba6377
Binary files differ
diff --git a/filamento/files/britney_starbies.jpg b/filamento/files/britney_starbies.jpg
new file mode 100644
index 0000000..2e81677
--- /dev/null
+++ b/filamento/files/britney_starbies.jpg
Binary files differ
diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/1.sql
index c2f35dd..502c5a9 100644
--- a/filamento/migrations/20240113011930_luz.sql
+++ b/filamento/migrations/1.sql
@@ -2,7 +2,7 @@ PRAGMA foreign_keys = on;
-- a user jid will never change, only a chat user will change
-- TODO: avatar, nick, etc.
-create table users(
+create table if not exists users(
-- TODO: enforce bare jid
jid text primary key not null,
nick text,
@@ -29,7 +29,7 @@ create table users(
-- primary key(activated timestamp, id, jid)
-- );
-create table resources(
+create table if not exists resources(
bare_jid text not null,
resource text not null,
foreign key(bare_jid) references users(jid),
@@ -37,14 +37,14 @@ create table resources(
);
-- enum for subscription state
-create table subscription(
+create table if not exists subscription(
state text primary key not null
);
-insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy');
+insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy') on conflict do nothing;
-- a roster contains users, with client-set nickname
-CREATE TABLE roster(
+CREATE TABLE if not exists roster(
user_jid text primary key not null,
name TEXT,
subscription text not null,
@@ -52,11 +52,11 @@ CREATE TABLE roster(
foreign key(user_jid) references users(jid)
);
-create table groups(
+create table if not exists groups(
group_name text primary key not null
);
-create table groups_roster(
+create table if not exists groups_roster(
group_name text not null,
contact_jid text not null,
foreign key(group_name) references groups(group_name),
@@ -67,7 +67,7 @@ create table groups_roster(
-- chat includes reference to user jid chat is with
-- specifically for dms, groups should be different
-- can send chat message to user (creating a new chat if not already exists)
-create table chats (
+create table if not exists chats (
id text primary key not null,
have_chatted bool not null,
correspondent text not null unique,
@@ -75,14 +75,14 @@ create table chats (
);
-- enum for subscription state
-create table delivery(
+create table if not exists delivery(
state text primary key not null
);
-insert into delivery ( state ) values ('sending'), ('written'), ('sent'), ('delivered'), ('read'), ('failed'), ('queued');
+insert into delivery ( state ) values ('sending'), ('written'), ('sent'), ('delivered'), ('read'), ('failed'), ('queued') on conflict do nothing;
-- messages include reference to chat they are in, and who sent them.
-create table messages (
+create table if not exists messages (
id text primary key not null,
body text,
-- delivery is nullable as only messages sent by the user are markable
@@ -116,26 +116,26 @@ create table messages (
);
-- enum for subscription state
-create table show (
+create table if not exists show (
state text primary key not null
);
-insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away');
+insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away') on conflict do nothing;
-create table cached_status (
+create table if not exists cached_status (
id integer primary key not null,
show text,
message text,
foreign key(show) references show(state)
);
-insert into cached_status (id) values (0);
+insert into cached_status (id) values (0) on conflict do nothing;
-create table capability_hash_nodes (
+create table if not exists capability_hash_nodes (
node text primary key not null,
timestamp text,
-- TODO: normalization
capabilities text not null
);
-insert into capability_hash_nodes ( node, capabilities ) values ('https://bunny.garden/filamento#mSavc/SLnHm8zazs5RlcbD/iXoc=', 'aHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvY2Fwcx9odHRwOi8vamFiYmVyLm9yZy9wcm90b2NvbC9kaXNjbyNpbmZvH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL2Rpc2NvI2l0ZW1zH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL25pY2sfaHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvbmljaytub3RpZnkfHGNsaWVudB9wYx8fZmlsYW1lbnRvIDAuMS4wHx4cHA==');
+insert into capability_hash_nodes ( node, capabilities ) values ('https://bunny.garden/filamento#mSavc/SLnHm8zazs5RlcbD/iXoc=', 'aHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvY2Fwcx9odHRwOi8vamFiYmVyLm9yZy9wcm90b2NvbC9kaXNjbyNpbmZvH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL2Rpc2NvI2l0ZW1zH2h0dHA6Ly9qYWJiZXIub3JnL3Byb3RvY29sL25pY2sfaHR0cDovL2phYmJlci5vcmcvcHJvdG9jb2wvbmljaytub3RpZnkfHGNsaWVudB9wYx8fZmlsYW1lbnRvIDAuMS4wHx4cHA==') on conflict do nothing;
diff --git a/filamento/src/caps.rs b/filamento/src/caps.rs
index 819e669..e0587ff 100644
--- a/filamento/src/caps.rs
+++ b/filamento/src/caps.rs
@@ -5,7 +5,8 @@ use sha1::Sha1;
use sha2::{Digest, Sha256};
use sha3::Sha3_256;
use stanza::{
- xep_0030::info,
+ xep_0004,
+ xep_0030::{self, info},
xep_0115::{self, C},
xep_0300::{self, Algo, Hash},
xep_0390,
@@ -43,7 +44,11 @@ pub fn client_info() -> Info {
],
// "http://jabber.org/protocol/nick".to_string(),
identities: vec![Identity {
- name: Some("filamento 0.1.0".to_string()),
+ name: Some(format!(
+ "{pkg} {version}",
+ pkg = env!("CARGO_PKG_NAME"),
+ version = env!("CARGO_PKG_VERSION"),
+ )),
category: Category::Client(identity::Client::PC),
}],
}
@@ -53,17 +58,26 @@ pub fn caps(node: String, query: info::Query) -> Result<xep_0115::C, CapsEncodeE
let mut string = String::new();
// identities string
- let mut identities = Vec::new();
+ let mut identities = Vec::with_capacity(query.identities.len());
for identity in query.identities {
- let mut string = String::new();
- string.push_str(&identity.category);
+ let (category, r#type, lang, name) = (
+ identity.category,
+ identity.r#type,
+ identity.lang.unwrap_or_default(),
+ identity.name.unwrap_or_default(),
+ );
+
+ let mut string =
+ String::with_capacity(category.len() + r#type.len() + lang.len() + name.len() + 4);
+ string.push_str(&category);
string.push('/');
- string.push_str(&identity.r#type);
+ string.push_str(&r#type);
string.push('/');
- string.push_str(&identity.lang.unwrap_or_default());
+ string.push_str(&lang);
string.push('/');
- string.push_str(&identity.name.unwrap_or_default());
+ string.push_str(&name);
string.push('<');
+
identities.push(string);
}
identities.sort();
@@ -71,11 +85,12 @@ pub fn caps(node: String, query: info::Query) -> Result<xep_0115::C, CapsEncodeE
string.push_str(&identities_string);
// features string
- let mut features = Vec::new();
- for feature in query.features {
- let mut string = String::new();
- string.push_str(&feature.var);
+ let mut features = Vec::with_capacity(query.features.len());
+ for xep_0030::info::Feature { var, .. } in query.features {
+ let mut string = String::with_capacity(var.len() + 1);
+ string.push_str(&var);
string.push('<');
+
features.push(string);
}
features.sort();
@@ -83,45 +98,53 @@ pub fn caps(node: String, query: info::Query) -> Result<xep_0115::C, CapsEncodeE
string.push_str(&features_string);
// extensions string
- let mut extensions = Vec::new();
+ let mut extensions = Vec::with_capacity(query.extensions.len());
for extension in query.extensions {
- let mut string = String::new();
let form_type = extension
.fields
.iter()
.find(|field| field.var.as_deref() == Some("FORM_TYPE"))
.ok_or(CapsEncodeError::InvalidDataForm)?
.values
- .clone()
- .into_iter()
- .map(|value| value.0)
- .collect::<Vec<String>>()
+ .iter()
+ .map(|value| value.0.as_str())
+ .collect::<Vec<&str>>()
.concat();
- string.push_str(&form_type);
- string.push('<');
- let mut fields = Vec::new();
+
+ let mut fields = Vec::with_capacity(extension.fields.len());
for field in extension.fields {
if field.var.as_deref() == Some("FORM_TYPE") {
continue;
}
- let mut string = String::new();
- string.push_str(&field.var.unwrap_or_default());
- string.push('<');
- let mut values = Vec::new();
- for value in field.values {
- let mut string = String::new();
- string.push_str(&value.0);
+
+ let var = field.var.unwrap_or_default();
+
+ let mut values = Vec::with_capacity(field.values.len());
+ for xep_0004::Value(value) in field.values {
+ let mut string = String::with_capacity(value.len() + 1);
+ string.push_str(&value);
string.push('<');
+
values.push(string);
}
values.sort();
let values_string = values.concat();
+
+ let mut string = String::with_capacity(var.len() + values_string.len() + 1);
+ string.push_str(&var);
+ string.push('<');
string.push_str(&values_string);
+
fields.push(string);
}
fields.sort();
let fields_string = fields.concat();
+
+ let mut string = String::with_capacity(form_type.len() + fields_string.len() + 1);
+ string.push_str(&form_type);
+ string.push('<');
string.push_str(&fields_string);
+
extensions.push(string);
}
extensions.sort();
@@ -149,11 +172,12 @@ pub fn encode_caps2(query: info::Query) -> String {
let mut string = String::new();
// features string
- let mut features = Vec::new();
- for feature in query.features {
- let mut string = String::new();
- string.push_str(&feature.var);
+ let mut features = Vec::with_capacity(query.features.len());
+ for xep_0030::info::Feature { var, .. } in query.features {
+ let mut string = String::with_capacity(var.len() + 1);
+ string.push_str(&var);
string.push('\x1f');
+
features.push(string);
}
features.sort();
@@ -162,18 +186,27 @@ pub fn encode_caps2(query: info::Query) -> String {
string.push('\x1c');
// identities string
- let mut identities = Vec::new();
+ let mut identities = Vec::with_capacity(query.identities.len());
for identity in query.identities {
- let mut string = String::new();
- string.push_str(&identity.category);
+ let (category, r#type, lang, name) = (
+ identity.category,
+ identity.r#type,
+ identity.lang.unwrap_or_default(),
+ identity.name.unwrap_or_default(),
+ );
+
+ let mut string =
+ String::with_capacity(category.len() + r#type.len() + lang.len() + name.len() + 5);
+ string.push_str(&category);
string.push('\x1f');
- string.push_str(&identity.r#type);
+ string.push_str(&r#type);
string.push('\x1f');
- string.push_str(&identity.lang.unwrap_or_default());
+ string.push_str(&lang);
string.push('\x1f');
- string.push_str(&identity.name.unwrap_or_default());
+ string.push_str(&name);
string.push('\x1f');
string.push('\x1e');
+
identities.push(string);
}
identities.sort();
@@ -182,37 +215,45 @@ pub fn encode_caps2(query: info::Query) -> String {
string.push('\x1c');
// extensions string
- let mut extensions = Vec::new();
+ let mut extensions = Vec::with_capacity(query.extensions.len());
for extension in query.extensions {
- let mut string = String::new();
- let mut fields = Vec::new();
+ let mut fields = Vec::with_capacity(extension.fields.len());
for field in extension.fields {
- let mut string = String::new();
- string.push_str(&field.var.unwrap_or_default());
- string.push('\x1f');
- let mut values = Vec::new();
- for value in field.values {
- let mut string = String::new();
- string.push_str(&value.0);
+ let var = field.var.unwrap_or_default();
+
+ let mut values = Vec::with_capacity(field.values.len());
+ for xep_0004::Value(value) in field.values {
+ let mut string = String::with_capacity(value.len() + 1);
+ string.push_str(&value);
string.push('\x1f');
+
values.push(string);
}
values.sort();
let values_string = values.concat();
+
+ let mut string = String::with_capacity(var.len() + values_string.len() + 2);
+ string.push_str(&var);
+ string.push('\x1f');
string.push_str(&values_string);
string.push('\x1e');
+
fields.push(string);
}
fields.sort();
let fields_string = fields.concat();
+
+ let mut string = String::with_capacity(fields_string.len() + 1);
string.push_str(&fields_string);
string.push('\x1d');
+
extensions.push(string);
}
extensions.sort();
let extensions_string = extensions.concat();
string.push_str(&extensions_string);
string.push('\x1c');
+
string
}
@@ -336,7 +377,7 @@ pub fn node_to_hash(node: String) -> Result<Hash, HashNodeConversionError> {
#[cfg(test)]
mod tests {
- use peanuts::{Writer, element::IntoElement};
+ use peanuts::Writer;
use stanza::{
xep_0004::{Field, FieldType, Value, X, XType},
xep_0030::info::{Feature, Identity},
@@ -344,6 +385,7 @@ mod tests {
use super::*;
+ #[cfg(not(target_arch = "wasm32"))]
#[tokio::test]
async fn test_caps() {
tracing_subscriber::fmt().init();
@@ -448,6 +490,7 @@ mod tests {
writer.write(&test_caps).await.unwrap();
}
+ #[cfg(not(target_arch = "wasm32"))]
#[tokio::test]
pub async fn test_gen_client_caps() {
let stdout = tokio::io::stdout();
diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs
index 147c7f7..c02654f 100644
--- a/filamento/src/chat.rs
+++ b/filamento/src/chat.rs
@@ -1,25 +1,31 @@
+use std::fmt::{Display, Write};
+
use chrono::{DateTime, Utc};
-use jid::JID;
-use sqlx::Sqlite;
+use jid::{BareJID, JID};
+use rusqlite::{
+ ToSql,
+ types::{FromSql, ToSqlOutput, Value},
+};
use uuid::Uuid;
-#[derive(Debug, sqlx::FromRow, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
+#[cfg_attr(feature = "reactive_stores", derive(reactive_stores::Store))]
pub struct Message {
pub id: Uuid,
// does not contain full user information
- #[sqlx(rename = "from_jid")]
// bare jid (for now)
- pub from: JID,
+ pub from: BareJID,
pub delivery: Option<Delivery>,
pub timestamp: DateTime<Utc>,
// TODO: originally_from
// TODO: message edits
// TODO: message timestamp
- #[sqlx(flatten)]
pub body: Body,
}
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub enum Delivery {
Sending,
Written,
@@ -30,45 +36,47 @@ pub enum Delivery {
Queued,
}
-impl sqlx::Type<Sqlite> for Delivery {
- fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
- <&str as sqlx::Type<Sqlite>>::type_info()
+impl Display for Delivery {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Delivery::Sending => f.write_str("sending"),
+ Delivery::Written => f.write_str("written"),
+ Delivery::Sent => f.write_str("sent"),
+ Delivery::Delivered => f.write_str("delivered"),
+ Delivery::Read => f.write_str("read"),
+ Delivery::Failed => f.write_str("failed"),
+ Delivery::Queued => f.write_str("queued"),
+ }
}
}
-impl sqlx::Decode<'_, Sqlite> for Delivery {
- fn decode(
- value: <Sqlite as sqlx::Database>::ValueRef<'_>,
- ) -> Result<Self, sqlx::error::BoxDynError> {
- let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
- match value {
- "sending" => Ok(Self::Sending),
- "written" => Ok(Self::Written),
- "sent" => Ok(Self::Sent),
- "delivered" => Ok(Self::Delivered),
- "read" => Ok(Self::Read),
- "failed" => Ok(Self::Failed),
- "queued" => Ok(Self::Queued),
- _ => unreachable!(),
- }
+impl ToSql for Delivery {
+ fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
+ Ok(match self {
+ Delivery::Sending => ToSqlOutput::Owned(Value::Text("sending".to_string())),
+ Delivery::Written => ToSqlOutput::Owned(Value::Text("written".to_string())),
+ Delivery::Sent => ToSqlOutput::Owned(Value::Text("sent".to_string())),
+ Delivery::Delivered => ToSqlOutput::Owned(Value::Text("delivered".to_string())),
+ Delivery::Read => ToSqlOutput::Owned(Value::Text("read".to_string())),
+ Delivery::Failed => ToSqlOutput::Owned(Value::Text("failed".to_string())),
+ Delivery::Queued => ToSqlOutput::Owned(Value::Text("queued".to_string())),
+ })
}
}
-impl sqlx::Encode<'_, Sqlite> for Delivery {
- fn encode_by_ref(
- &self,
- buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
- ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
- let value = match self {
- Delivery::Sending => "sending",
- Delivery::Written => "written",
- Delivery::Sent => "sent",
- Delivery::Delivered => "delivered",
- Delivery::Read => "read",
- Delivery::Failed => "failed",
- Delivery::Queued => "queued",
- };
- <&str as sqlx::Encode<Sqlite>>::encode(value, buf)
+impl FromSql for Delivery {
+ fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
+ Ok(match value.as_str()? {
+ "sending" => Self::Sending,
+ "written" => Self::Written,
+ "sent" => Self::Sent,
+ "delivered" => Self::Delivered,
+ "read" => Self::Read,
+ "failed" => Self::Failed,
+ "queued" => Self::Queued,
+ // TODO: don't have these lol
+ value => panic!("unexpected subscription `{value}`"),
+ })
}
}
@@ -78,15 +86,18 @@ impl sqlx::Encode<'_, Sqlite> for Delivery {
// Outside,
// }
-#[derive(Debug, sqlx::FromRow, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Body {
// TODO: rich text, other contents, threads
pub body: String,
}
-#[derive(sqlx::FromRow, Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
+#[cfg_attr(feature = "reactive_stores", derive(reactive_stores::Store))]
pub struct Chat {
- pub correspondent: JID,
+ pub correspondent: BareJID,
pub have_chatted: bool,
// pub unread_messages: i32,
// pub latest_message: Message,
@@ -98,13 +109,13 @@ pub struct Chat {
pub enum ChatUpdate {}
impl Chat {
- pub fn new(correspondent: JID, have_chatted: bool) -> Self {
+ pub fn new(correspondent: BareJID, have_chatted: bool) -> Self {
Self {
correspondent,
have_chatted,
}
}
- pub fn correspondent(&self) -> &JID {
+ pub fn correspondent(&self) -> &BareJID {
&self.correspondent
}
}
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
index 1d3d36c..298d54a 100644
--- a/filamento/src/db.rs
+++ b/filamento/src/db.rs
@@ -1,94 +1,1001 @@
-use std::{collections::HashSet, path::Path};
+use core::fmt::Display;
+use std::{collections::HashSet, ops::Deref, path::Path, sync::Arc};
-use chrono::Utc;
-use jid::JID;
-use sqlx::{SqlitePool, migrate};
+use chrono::{DateTime, Utc};
+use jid::{BareJID, FullJID, JID};
+use rusqlite::{Connection, OptionalExtension};
+use tokio::sync::{Mutex, MutexGuard};
+use tokio::sync::{mpsc, oneshot};
+use tokio::task::{spawn, spawn_blocking};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
+use tracing::debug;
use uuid::Uuid;
use crate::{
- chat::{Chat, Message},
+ chat::{Body, Chat, Delivery, Message},
error::{DatabaseError as Error, DatabaseOpenError},
presence::Online,
roster::Contact,
user::User,
};
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct Db {
- db: SqlitePool,
+ sender: mpsc::UnboundedSender<DbCommand>,
+}
+
+impl Deref for Db {
+ type Target = mpsc::UnboundedSender<DbCommand>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+#[derive(Debug)]
+pub struct DbActor {
+ receiver: mpsc::UnboundedReceiver<DbCommand>,
+ db: Connection,
+}
+
+macro_rules! impl_db_sends {
+ ($($command:ident => $name:ident($($arg:ident: $arg_t:ty),*) -> $ret:ty);*) => {
+ $(
+ pub(crate) async fn $name(&self, $($arg: $arg_t),*) -> Result<$ret, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::$command { $($arg,)* result };
+ let _ = self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+ )*
+ }
}
-// TODO: turn into trait
impl Db {
+ #[cfg(not(target_arch = "wasm32"))]
+ pub async fn create_connect_and_migrate(
+ path: impl AsRef<Path> + Send,
+ ) -> Result<Self, DatabaseOpenError> {
+ let (sender, receiver) = mpsc::unbounded_channel();
+
+ let actor = DbActor::new(path, receiver)?;
+ spawn_blocking(move || actor.run());
+
+ Ok(Self { sender })
+ }
+
+ #[cfg(not(target_arch = "wasm32"))]
+ pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> {
+ let (sender, receiver) = mpsc::unbounded_channel();
+
+ let actor = DbActor::new_memory(receiver)?;
+ spawn_blocking(move || actor.run());
+
+ Ok(Self { sender })
+ }
+
+ /// `file_name` should be a file not in a directory
+ #[cfg(target_arch = "wasm32")]
pub async fn create_connect_and_migrate(
+ file_name: impl AsRef<str> + Send + 'static,
+ ) -> Result<Self, DatabaseOpenError> {
+ use tokio_with_wasm::spawn_local;
+
+ let (sender, receiver) = mpsc::unbounded_channel();
+ let (result_send, result_recv) = oneshot::channel();
+ spawn_blocking(move || {
+ spawn_local(async move {
+ debug!("installing opfs in spawn");
+ match rusqlite::ffi::install_opfs_sahpool(
+ Some(&rusqlite::ffi::OpfsSAHPoolCfg::default()),
+ false,
+ )
+ .await
+ {
+ Ok(_) => {}
+ Err(e) => {
+ use crate::error::OpfsSAHError;
+
+ let error: OpfsSAHError = e.into();
+ result_send.send(Err(error.into()));
+ return;
+ }
+ }
+ debug!("opfs installed");
+ let file_name = format!("file:{}?vfs=opfs-sahpool", file_name.as_ref());
+ let result = DbActor::new(file_name, receiver);
+ match result {
+ Ok(a) => {
+ result_send.send(Ok(()));
+ a.run().await
+ }
+ Err(e) => {
+ result_send.send(Err(e));
+ }
+ }
+ });
+ });
+ match result_recv.await {
+ Ok(r) => match r {
+ Ok(o) => Ok(Self { sender }),
+ Err(e) => return Err(e),
+ },
+ Err(e) => return Err(e.into()),
+ }
+ }
+
+ #[cfg(target_arch = "wasm32")]
+ pub async fn create_connect_and_migrate_memory() -> Result<Self, DatabaseOpenError> {
+ let (sender, receiver) = mpsc::unbounded_channel();
+ let (result_send, result_recv) = oneshot::channel();
+ spawn_blocking(move || {
+ let result = DbActor::new_memory(receiver);
+ match result {
+ Ok(a) => {
+ result_send.send(Ok(()));
+ tokio_with_wasm::spawn_local(async { a.run().await });
+ // a.run()
+ }
+ Err(e) => {
+ result_send.send(Err(e));
+ }
+ }
+ });
+ match result_recv.await {
+ Ok(r) => match r {
+ Ok(o) => Ok(Self { sender }),
+ Err(e) => return Err(e),
+ },
+ Err(e) => return Err(e.into()),
+ }
+ }
+
+ pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::CreateUser { user, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ // TODO: this is not a 'read' user
+ pub(crate) async fn read_user(&self, user: BareJID) -> Result<User, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadUser { user, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// returns whether or not the nickname was updated
+ pub(crate) async fn delete_user_nick(&self, jid: BareJID) -> Result<bool, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::DeleteUserNick { jid, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// returns whether or not the nickname was updated
+ pub(crate) async fn upsert_user_nick(&self, jid: BareJID, nick: String) -> Result<bool, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpsertUserNick { jid, nick, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
+ pub(crate) async fn delete_user_avatar(
+ &self,
+ jid: BareJID,
+ ) -> Result<(bool, Option<String>), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::DeleteUserAvatar { jid, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
+ pub(crate) async fn upsert_user_avatar(
+ &self,
+ jid: BareJID,
+ avatar: String,
+ ) -> Result<(bool, Option<String>), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpsertUserAvatar {
+ jid,
+ avatar,
+ result,
+ };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ // TODO: use references everywhere
+ pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpdateUser { user, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ // TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
+ // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {}
+
+ /// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
+ pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::CreateContact { contact, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn read_contact(&self, contact: BareJID) -> Result<Contact, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadContact { contact, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn read_contact_opt(
+ &self,
+ contact: BareJID,
+ ) -> Result<Option<Contact>, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadContactOpt { contact, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// does not update the underlying user, to update user, update_user() must be called separately
+ pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpdateContact { contact, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpsertContact { contact, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn delete_contact(&self, contact: BareJID) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::DeleteContact { contact, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReplaceCachedRoster { roster, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadCachedRoster { result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn read_cached_roster_with_users(
+ &self,
+ ) -> Result<Vec<(Contact, User)>, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadCachedRosterWithUsers { result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::CreateChat { chat, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ // TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.
+
+ pub(crate) async fn read_chat(&self, chat: BareJID) -> Result<Chat, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadChat { chat, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn read_chat_and_user(&self, chat: BareJID) -> Result<(Chat, User), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadChatAndUser { chat, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn mark_chat_as_chatted(&self, chat: BareJID) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::MarkChatAsChatted { chat, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn update_chat_correspondent(
+ &self,
+ old_chat: Chat,
+ new_correspondent: BareJID,
+ ) -> Result<Chat, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpdateChatCorrespondent {
+ old_chat,
+ new_correspondent,
+ result,
+ };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ // pub(crate) async fn update_chat
+
+ pub(crate) async fn delete_chat(&self, chat: BareJID) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::DeleteChat { chat, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// TODO: sorting and filtering (for now there is no sorting)
+ pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadChats { result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadChatsOrdered { result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered_with_latest_messages(
+ &self,
+ ) -> Result<Vec<(Chat, Message)>, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadChatsOrderedWithLatestMessages { result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(
+ &self,
+ ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
+ #[tracing::instrument]
+ pub(crate) async fn create_message(
+ &self,
+ message: Message,
+ chat: BareJID,
+ from: FullJID,
+ ) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::CreateMessage {
+ message,
+ chat,
+ from,
+ result,
+ };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn upsert_chat_and_user(&self, chat: BareJID) -> Result<bool, Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpsertChatAndUser { chat, result };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ /// create direct message from incoming. MUST upsert chat and user
+ #[tracing::instrument]
+ pub(crate) async fn create_message_with_user_resource(
+ &self,
+ message: Message,
+ // TODO: enforce two kinds of jid. bare and full
+ chat: BareJID,
+ from: FullJID,
+ ) -> Result<(), Error> {
+ tracing::info!("MSGDEBUG create_message_with_user_resource exists");
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::CreateMessageWithUserResource {
+ message,
+ chat,
+ from,
+ result,
+ };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ pub(crate) async fn update_message_delivery(
+ &self,
+ message: Uuid,
+ delivery: Delivery,
+ ) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpdateMessageDelivery {
+ message,
+ delivery,
+ result,
+ };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+
+ // pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
+ // Ok(Message {
+ // id: Uuid,
+ // from: todo!(),
+ // delivery: todo!(),
+ // timestamp: todo!(),
+ // body: todo!(),
+ // })
+ // }
+
+ // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}
+
+ impl_db_sends!(
+ ReadCapabilities => read_capabilities(node: String) -> String;
+ DeleteCachedStatus => delete_cached_status() -> ();
+ UpsertCachedStatus => upsert_cached_status(status: Online) -> ();
+ ReadCachedStatus => read_cached_status() -> Online;
+ ReadMessageHistoryWithUsers => read_message_history_with_users(chat: BareJID) -> Vec<(Message, User)>;
+ // TODO: paging
+ ReadMessageHistory => read_message_history(chat: BareJID) -> Vec<Message>;
+ ReadMessage => read_message(message: Uuid) -> Message;
+ DeleteMessage => delete_message(message: Uuid) -> ()
+ );
+
+ pub(crate) async fn upsert_capabilities(
+ &self,
+ node: String,
+ capabilities: String,
+ ) -> Result<(), Error> {
+ let (result, recv) = oneshot::channel();
+ let command = DbCommand::UpsertCapabilities {
+ node,
+ capabilities,
+ result,
+ };
+ self.sender.send(command);
+ let result = recv.await?;
+ result
+ }
+}
+
+// TODO: i should really just make an actor macro
+#[derive(Debug)]
+pub enum DbCommand {
+ CreateUser {
+ user: User,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ ReadUser {
+ user: BareJID,
+ result: oneshot::Sender<Result<User, Error>>,
+ },
+ DeleteUserNick {
+ jid: BareJID,
+ result: oneshot::Sender<Result<bool, Error>>,
+ },
+ UpsertUserNick {
+ jid: BareJID,
+ nick: String,
+ result: oneshot::Sender<Result<bool, Error>>,
+ },
+ DeleteUserAvatar {
+ jid: BareJID,
+ result: oneshot::Sender<Result<(bool, Option<String>), Error>>,
+ },
+ UpsertUserAvatar {
+ jid: BareJID,
+ avatar: String,
+ result: oneshot::Sender<Result<(bool, Option<String>), Error>>,
+ },
+ UpdateUser {
+ user: User,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ CreateContact {
+ contact: Contact,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ ReadContact {
+ contact: BareJID,
+ result: oneshot::Sender<Result<Contact, Error>>,
+ },
+ ReadContactOpt {
+ contact: BareJID,
+ result: oneshot::Sender<Result<Option<Contact>, Error>>,
+ },
+ UpdateContact {
+ contact: Contact,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ UpsertContact {
+ contact: Contact,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ DeleteContact {
+ contact: BareJID,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ ReplaceCachedRoster {
+ roster: Vec<Contact>,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ ReadCachedRoster {
+ result: oneshot::Sender<Result<Vec<Contact>, Error>>,
+ },
+ ReadCachedRosterWithUsers {
+ result: oneshot::Sender<Result<Vec<(Contact, User)>, Error>>,
+ },
+ CreateChat {
+ chat: Chat,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ ReadChat {
+ chat: BareJID,
+ result: oneshot::Sender<Result<Chat, Error>>,
+ },
+ ReadChatAndUser {
+ chat: BareJID,
+ result: oneshot::Sender<Result<(Chat, User), Error>>,
+ },
+ MarkChatAsChatted {
+ chat: BareJID,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ UpdateChatCorrespondent {
+ old_chat: Chat,
+ new_correspondent: BareJID,
+ result: oneshot::Sender<Result<Chat, Error>>,
+ },
+ DeleteChat {
+ chat: BareJID,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ ReadChats {
+ result: oneshot::Sender<Result<Vec<Chat>, Error>>,
+ },
+ ReadChatsOrdered {
+ result: oneshot::Sender<Result<Vec<Chat>, Error>>,
+ },
+ ReadChatsOrderedWithLatestMessages {
+ result: oneshot::Sender<Result<Vec<(Chat, Message)>, Error>>,
+ },
+ ReadChatsOrderedWithLatestMessagesAndUsers {
+ result: oneshot::Sender<Result<Vec<((Chat, User), (Message, User))>, Error>>,
+ },
+ // ReadChatID {
+
+ // result: oneshot::Sender<Result<, Error>>,
+ // },
+ // ReadChatIDOpt {
+ // chat: JID,
+ // result: oneshot::Sender<Result<Option<Uuid>, Error>>,
+ // },
+ CreateMessage {
+ message: Message,
+ chat: BareJID,
+ from: FullJID,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ UpsertChatAndUser {
+ chat: BareJID,
+ result: oneshot::Sender<Result<bool, Error>>,
+ },
+ CreateMessageWithUserResource {
+ message: Message,
+ chat: BareJID,
+ from: FullJID,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ UpdateMessageDelivery {
+ message: Uuid,
+ delivery: Delivery,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ DeleteMessage {
+ message: Uuid,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ ReadMessage {
+ message: Uuid,
+ result: oneshot::Sender<Result<Message, Error>>,
+ },
+ ReadMessageHistory {
+ chat: BareJID,
+ result: oneshot::Sender<Result<Vec<Message>, Error>>,
+ },
+ ReadMessageHistoryWithUsers {
+ chat: BareJID,
+ result: oneshot::Sender<Result<Vec<(Message, User)>, Error>>,
+ },
+ ReadCachedStatus {
+ result: oneshot::Sender<Result<Online, Error>>,
+ },
+ UpsertCachedStatus {
+ status: Online,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ DeleteCachedStatus {
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+ ReadCapabilities {
+ node: String,
+ result: oneshot::Sender<Result<String, Error>>,
+ },
+ UpsertCapabilities {
+ node: String,
+ capabilities: String,
+ result: oneshot::Sender<Result<(), Error>>,
+ },
+}
+
+impl Display for DbCommand {
+ #[rustfmt::skip]
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(match self {
+ DbCommand::CreateUser { user, result } => "CreateUser",
+ DbCommand::ReadUser { user, result } => "ReadUser",
+ DbCommand::DeleteUserNick { jid, result } => "DeleteUserNick",
+ DbCommand::UpsertUserNick { jid, nick, result } => "UpsertUserNick",
+ DbCommand::DeleteUserAvatar { jid, result } => "DeleteUserAvatar",
+ DbCommand::UpsertUserAvatar { jid, avatar, result } => "UpsertUserAvatar",
+ DbCommand::UpdateUser { user, result } => "UpdateUser",
+ DbCommand::CreateContact { contact, result } => "CreateContact",
+ DbCommand::ReadContact { contact, result } => "ReadContact",
+ DbCommand::ReadContactOpt { contact, result } => "ReadContactOpt",
+ DbCommand::UpdateContact { contact, result } => "UpdateContact",
+ DbCommand::UpsertContact { contact, result } => "UpsertContact",
+ DbCommand::DeleteContact { contact, result } => "DeleteContact",
+ DbCommand::ReplaceCachedRoster { roster, result } => "ReplaceCachedRoster",
+ DbCommand::ReadCachedRoster { result } => "ReadCachedRoster",
+ DbCommand::ReadCachedRosterWithUsers { result } => "ReadCachedRosterWithUsers",
+ DbCommand::CreateChat { chat, result } => "CreateChat",
+ DbCommand::ReadChat { chat, result } => "ReadChat",
+ DbCommand::MarkChatAsChatted { chat, result } => "MarkChatAsChatted",
+ DbCommand::UpdateChatCorrespondent { old_chat, new_correspondent, result } => "UpdateChatCorrespondent",
+ DbCommand::DeleteChat { chat, result } => "DeleteChat",
+ DbCommand::ReadChats { result } => "ReadChats",
+ DbCommand::ReadChatsOrdered { result } => "ReadChatsOrdered",
+ DbCommand::ReadChatsOrderedWithLatestMessages { result } => "ReadChatsOrderedWithLatestMessages",
+ DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result } => "ReadChatsOrderedWithLatestMessagesAndUsers",
+ DbCommand::CreateMessage { message, chat, from, result } => "CreateMessage",
+ DbCommand::UpsertChatAndUser { chat, result } => "UpsertChatAndUser",
+ DbCommand::CreateMessageWithUserResource { message, chat, from, result } => "CreateMessageWithUserResource",
+ DbCommand::UpdateMessageDelivery { message, delivery, result } => "UpdateMessageDelivery",
+ DbCommand::DeleteMessage { message, result } => "DeleteMessage",
+ DbCommand::ReadMessage { message, result } => "ReadMessage",
+ DbCommand::ReadMessageHistory { chat, result } => "ReadMessageHistory",
+ DbCommand::ReadMessageHistoryWithUsers { chat, result } => "ReadMessageHistoryWithUsers",
+ DbCommand::ReadCachedStatus { result } => "ReadCachedStatus",
+ DbCommand::UpsertCachedStatus { status, result } => "UpsertCachedStatus",
+ DbCommand::DeleteCachedStatus { result } => "DeleteCachedStatus",
+ DbCommand::ReadCapabilities { node, result } => "ReadCapabilities",
+ DbCommand::UpsertCapabilities { node, capabilities, result } => "UpsertCapabilities",
+ DbCommand::ReadChatAndUser { chat, result } => "ReadChatAndUser",
+ })
+ }
+}
+
+impl DbActor {
+ /// must be run in blocking spawn
+ #[cfg(not(target_arch = "wasm32"))]
+ pub(crate) fn new(
path: impl AsRef<Path>,
+ receiver: mpsc::UnboundedReceiver<DbCommand>,
) -> Result<Self, DatabaseOpenError> {
if let Some(dir) = path.as_ref().parent() {
if dir.is_dir() {
} else {
- tokio::fs::create_dir_all(dir).await?;
+ std::fs::create_dir_all(dir)?;
}
- let _file = tokio::fs::OpenOptions::new()
+ let _file = std::fs::OpenOptions::new()
.append(true)
.create(true)
- .open(path.as_ref())
- .await?;
+ .open(path.as_ref())?;
}
let url = format!(
- "sqlite://{}",
+ "{}",
path.as_ref()
.to_str()
.ok_or(DatabaseOpenError::InvalidPath)?
);
- let db = SqlitePool::connect(&url).await?;
- migrate!().run(&db).await?;
- Ok(Self { db })
+ // let db = SqlitePool::connect(&url).await?;
+ // migrate!().run(&db).await?;
+ // Ok(Self { db })
+ let db = Connection::open(url)?;
+ db.execute_batch(include_str!("../migrations/1.sql"))?;
+ Ok(Self { db, receiver })
}
- pub(crate) fn new(db: SqlitePool) -> Self {
- Self { db }
+ /// must be run in blocking spawn
+ #[cfg(not(target_arch = "wasm32"))]
+ pub(crate) fn new_memory(
+ receiver: mpsc::UnboundedReceiver<DbCommand>,
+ ) -> Result<Self, DatabaseOpenError> {
+ let db = Connection::open_in_memory()?;
+ db.execute_batch(include_str!("../migrations/1.sql"))?;
+ Ok(Self { db, receiver })
}
- pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
- sqlx::query!(
- "insert into users ( jid, nick, cached_status_message ) values ( ?, ?, ? )",
- user.jid,
- user.nick,
- user.cached_status_message
- )
- .execute(&self.db)
- .await?;
+ /// must be run in blocking spawn
+ #[cfg(target_arch = "wasm32")]
+ pub fn new_memory(
+ receiver: mpsc::UnboundedReceiver<DbCommand>,
+ ) -> Result<Self, DatabaseOpenError> {
+ let db = Connection::open("mem.db")?;
+ db.execute_batch(include_str!("../migrations/1.sql"))?;
+ Ok(Self { db, receiver })
+ }
+
+ /// must be run in blocking spawn
+ #[cfg(target_arch = "wasm32")]
+ pub fn new(
+ file_name: impl AsRef<Path>,
+ receiver: mpsc::UnboundedReceiver<DbCommand>,
+ ) -> Result<Self, DatabaseOpenError> {
+ let db = Connection::open(file_name)?;
+ db.execute_batch(include_str!("../migrations/1.sql"))?;
+ Ok(Self { db, receiver })
+ }
+
+ pub(crate) async fn run(mut self) {
+ while let Some(cmd) = self.receiver.recv().await {
+ let cmd_name = cmd.to_string();
+ tracing::warn!("command recv: {cmd_name}");
+ match cmd {
+ DbCommand::CreateUser { user, result } => {
+ result.send(self.create_user(user));
+ }
+ DbCommand::ReadUser { user, result } => {
+ result.send(self.read_user(user));
+ }
+ DbCommand::DeleteUserNick { jid, result } => {
+ result.send(self.delete_user_nick(jid));
+ }
+ DbCommand::UpsertUserNick { jid, nick, result } => {
+ result.send(self.upsert_user_nick(jid, nick));
+ }
+ DbCommand::DeleteUserAvatar { jid, result } => {
+ result.send(self.delete_user_avatar(jid));
+ }
+ DbCommand::UpsertUserAvatar {
+ jid,
+ avatar,
+ result,
+ } => {
+ result.send(self.upsert_user_avatar(jid, avatar));
+ }
+ DbCommand::UpdateUser { user, result } => {
+ result.send(self.update_user(user));
+ }
+ DbCommand::CreateContact { contact, result } => {
+ result.send(self.create_contact(contact));
+ }
+ DbCommand::ReadContact { contact, result } => {
+ result.send(self.read_contact(contact));
+ }
+ DbCommand::ReadContactOpt { contact, result } => {
+ result.send(self.read_contact_opt(contact));
+ }
+ DbCommand::UpdateContact { contact, result } => {
+ result.send(self.update_contact(contact));
+ }
+ DbCommand::UpsertContact { contact, result } => {
+ result.send(self.upsert_contact(contact));
+ }
+ DbCommand::DeleteContact { contact, result } => {
+ result.send(self.delete_contact(contact));
+ }
+ DbCommand::ReplaceCachedRoster { roster, result } => {
+ result.send(self.replace_cached_roster(roster));
+ }
+ DbCommand::ReadCachedRoster { result } => {
+ result.send(self.read_cached_roster());
+ }
+ DbCommand::ReadCachedRosterWithUsers { result } => {
+ result.send(self.read_cached_roster_with_users());
+ }
+ DbCommand::CreateChat { chat, result } => {
+ result.send(self.create_chat(chat));
+ }
+ DbCommand::ReadChat { chat, result } => {
+ result.send(self.read_chat(chat));
+ }
+ DbCommand::ReadChatAndUser { chat, result } => {
+ result.send(self.read_chat_and_user(chat));
+ }
+ DbCommand::MarkChatAsChatted { chat, result } => {
+ result.send(self.mark_chat_as_chatted(chat));
+ }
+ DbCommand::UpdateChatCorrespondent {
+ old_chat,
+ new_correspondent,
+ result,
+ } => {
+ result.send(self.update_chat_correspondent(old_chat, new_correspondent));
+ }
+ DbCommand::DeleteChat { chat, result } => {
+ result.send(self.delete_chat(chat));
+ }
+ DbCommand::ReadChats { result } => {
+ result.send(self.read_chats());
+ }
+ DbCommand::ReadChatsOrdered { result } => {
+ result.send(self.read_chats_ordered());
+ }
+ DbCommand::ReadChatsOrderedWithLatestMessages { result } => {
+ result.send(self.read_chats_ordered_with_latest_messages());
+ }
+ DbCommand::ReadChatsOrderedWithLatestMessagesAndUsers { result } => {
+ result.send(self.read_chats_ordered_with_latest_messages_and_users());
+ }
+ DbCommand::CreateMessage {
+ message,
+ chat,
+ from,
+ result,
+ } => {
+ result.send(self.create_message(message, chat, from));
+ }
+ DbCommand::UpsertChatAndUser { chat, result } => {
+ result.send(self.upsert_chat_and_user(&chat));
+ }
+ DbCommand::CreateMessageWithUserResource {
+ message,
+ chat,
+ from,
+ result,
+ } => {
+ result.send(self.create_message_with_user_resource(message, chat, from));
+ }
+ DbCommand::UpdateMessageDelivery {
+ message,
+ delivery,
+ result,
+ } => {
+ result.send(self.update_message_delivery(message, delivery));
+ }
+ DbCommand::DeleteMessage { message, result } => {
+ result.send(self.delete_message(message));
+ }
+ DbCommand::ReadMessage { message, result } => {
+ result.send(self.read_message(message));
+ }
+ DbCommand::ReadMessageHistory { chat, result } => {
+ result.send(self.read_message_history(chat));
+ }
+ DbCommand::ReadMessageHistoryWithUsers { chat, result } => {
+ result.send(self.read_message_history_with_users(chat));
+ }
+ DbCommand::ReadCachedStatus { result } => {
+ result.send(self.read_cached_status());
+ }
+ DbCommand::UpsertCachedStatus { status, result } => {
+ result.send(self.upsert_cached_status(status));
+ }
+ DbCommand::DeleteCachedStatus { result } => {
+ result.send(self.delete_cached_status());
+ }
+ DbCommand::ReadCapabilities { node, result } => {
+ result.send(self.read_capabilities(node));
+ }
+ DbCommand::UpsertCapabilities {
+ node,
+ capabilities,
+ result,
+ } => {
+ result.send(self.upsert_capabilities(node, capabilities));
+ }
+ }
+ tracing::warn!("command finished: {cmd_name}");
+ }
+ tracing::error!("command: db actor exited");
+ }
+
+ pub(crate) fn create_user(&self, user: User) -> Result<(), Error> {
+ {
+ self.db.execute(
+ "insert into users ( jid, nick, avatar ) values ( ?1, ?2, ?3 )",
+ (user.jid, user.nick, user.avatar),
+ )?;
+ }
Ok(())
}
- pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
- sqlx::query!(
- "insert into users ( jid ) values ( ? ) on conflict do nothing",
- user
- )
- .execute(&self.db)
- .await?;
- let user: User = sqlx::query_as("select * from users where jid = ?")
- .bind(user)
- .fetch_one(&self.db)
- .await?;
- Ok(user)
+ // TODO: this is not a 'read' user
+ pub(crate) fn read_user(&self, user: BareJID) -> Result<User, Error> {
+ let db = &self.db;
+ let user_opt = db
+ .query_row(
+ "select jid, nick, avatar from users where jid = ?1",
+ [&user],
+ |row| {
+ Ok(User {
+ jid: row.get(0)?,
+ nick: row.get(1)?,
+ avatar: row.get(2)?,
+ })
+ },
+ )
+ .optional()?;
+ match user_opt {
+ Some(user) => Ok(user),
+ None => {
+ db.execute("insert into users ( jid ) values ( ?1 )", [&user])?;
+ Ok(User {
+ jid: user,
+ nick: None,
+ avatar: None,
+ })
+ }
+ }
}
/// returns whether or not the nickname was updated
- pub(crate) async fn delete_user_nick(&self, jid: JID) -> Result<bool, Error> {
- if sqlx::query!(
- "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?",
- jid,
- None::<String>,
- None::<String>,
- None::<String>,
- )
- .execute(&self.db)
- .await?
- .rows_affected()
- > 0
+ pub(crate) fn delete_user_nick(&self, jid: BareJID) -> Result<bool, Error> {
+ let rows_affected;
{
+ rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, None::<String>, None::<String>, None::<String>))?;
+ }
+ if rows_affected > 0 {
Ok(true)
} else {
Ok(false)
@@ -96,18 +1003,11 @@ impl Db {
}
/// returns whether or not the nickname was updated
- pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<bool, Error> {
- let rows_affected = sqlx::query!(
- "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ? where nick is not ?",
- jid,
- nick,
- nick,
- nick
- )
- .execute(&self.db)
- .await?
- .rows_affected();
- tracing::debug!("rows affected: {}", rows_affected);
+ pub(crate) fn upsert_user_nick(&self, jid: BareJID, nick: String) -> Result<bool, Error> {
+ let rows_affected;
+ {
+ rows_affected = self.db.execute("insert into users (jid, nick) values (?1, ?2) on conflict do update set nick = ?3 where nick is not ?4", (jid, &nick, &nick, &nick))?;
+ }
if rows_affected > 0 {
Ok(true)
} else {
@@ -116,32 +1016,18 @@ impl Db {
}
/// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
- pub(crate) async fn delete_user_avatar(
- &self,
- jid: JID,
- ) -> Result<(bool, Option<String>), Error> {
- #[derive(sqlx::FromRow)]
- struct AvatarRow {
- avatar: Option<String>,
- }
- let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?")
- .bind(jid.clone())
- .fetch_optional(&self.db)
- .await?
- .map(|row: AvatarRow| row.avatar)
- .unwrap_or(None);
- if sqlx::query!(
- "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?",
- jid,
- None::<String>,
- None::<String>,
- None::<String>,
- )
- .execute(&self.db)
- .await?
- .rows_affected()
- > 0
+ pub(crate) fn delete_user_avatar(&self, jid: BareJID) -> Result<(bool, Option<String>), Error> {
+ let (old_avatar, rows_affected): (Option<String>, _);
{
+ let db = &self.db;
+ old_avatar = db
+ .query_row("select avatar from users where jid = ?1", [&jid], |row| {
+ Ok(row.get(0)?)
+ })
+ .optional()?;
+ rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, None::<String>, None::<String>, None::<String>))?;
+ }
+ if rows_affected > 0 {
Ok((true, old_avatar))
} else {
Ok((false, old_avatar))
@@ -149,541 +1035,660 @@ impl Db {
}
/// returns whether or not the avatar was updated, and the file to delete if there existed an old avatar
- pub(crate) async fn upsert_user_avatar(
+ pub(crate) fn upsert_user_avatar(
&self,
- jid: JID,
+ jid: BareJID,
avatar: String,
) -> Result<(bool, Option<String>), Error> {
- #[derive(sqlx::FromRow)]
- struct AvatarRow {
- avatar: Option<String>,
- }
- let old_avatar: Option<String> = sqlx::query_as("select avatar from users where jid = ?")
- .bind(jid.clone())
- .fetch_optional(&self.db)
- .await?
- .map(|row: AvatarRow| row.avatar)
- .unwrap_or(None);
- if sqlx::query!(
- "insert into users (jid, avatar) values (?, ?) on conflict do update set avatar = ? where avatar is not ?",
- jid,
- avatar,
- avatar,
- avatar,
- )
- .execute(&self.db)
- .await?
- .rows_affected()
- > 0
+ let (old_avatar, rows_affected): (Option<String>, _);
{
+ let db = &self.db;
+ old_avatar = db
+ .query_row("select avatar from users where jid = ?1", [&jid], |row| {
+ let avatar: Option<String> = row.get(0)?;
+ Ok(avatar)
+ })
+ .optional()?
+ .unwrap_or_default();
+ rows_affected = db.execute("insert into users (jid, avatar) values (?1, ?2) on conflict do update set avatar = ?3 where avatar is not ?4", (jid, &avatar, &avatar, &avatar))?;
+ }
+ if rows_affected > 0 {
Ok((true, old_avatar))
} else {
Ok((false, old_avatar))
}
}
- pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
- sqlx::query!(
- "update users set cached_status_message = ?, nick = ? where jid = ?",
- user.cached_status_message,
- user.nick,
- user.jid
- )
- .execute(&self.db)
- .await?;
+ // TODO: use references everywhere
+ pub(crate) fn update_user(&self, user: User) -> Result<(), Error> {
+ self.db.execute(
+ "update users set nick = ?1, avatar = ?2 where user_jid = ?1",
+ (&user.nick, &user.avatar, &user.jid),
+ )?;
Ok(())
}
// TODO: should this be allowed? messages need to reference users. should probably only allow delete if every other thing referencing it has been deleted, or if you make clear to the user deleting a user will delete all messages associated with them.
- // pub(crate) async fn delete_user(&self, user: JID) -> Result<(), Error> {}
+ // pub(crate) fn delete_user(&self, user: JID) -> Result<(), Error> {}
/// does not create the underlying user, if underlying user does not exist, create_user() must be called separately
- pub(crate) async fn create_contact(&self, contact: Contact) -> Result<(), Error> {
- sqlx::query!(
- "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? )",
- contact.user_jid,
- contact.name,
- contact.subscription
- )
- .execute(&self.db)
- .await?;
- // TODO: abstract this out in to add_to_group() function ?
+ pub(crate) fn create_contact(&self, contact: Contact) -> Result<(), Error> {
+ let db = &self.db;
+ db.execute(
+ "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 )",
+ (&contact.user_jid, &contact.name, contact.subscription),
+ )?;
for group in contact.groups {
- sqlx::query!(
- "insert into groups (group_name) values (?) on conflict do nothing",
- group
- )
- .execute(&self.db)
- .await?;
- sqlx::query!(
- "insert into groups_roster (group_name, contact_jid) values (?, ?)",
- group,
- contact.user_jid
- )
- .execute(&self.db)
- .await?;
+ db.execute(
+ "insert into groups (group_name) values (?1) on conflict do nothing",
+ [&group],
+ )?;
+ db.execute(
+ "insert into groups_roster (group_name, contact_jid) values (?1, ?2)",
+ (group, &contact.user_jid),
+ )?;
}
Ok(())
}
- pub(crate) async fn read_contact(&self, contact: JID) -> Result<Contact, Error> {
- let mut contact: Contact = sqlx::query_as("select * from roster where user_jid = ?")
- .bind(contact)
- .fetch_one(&self.db)
- .await?;
- #[derive(sqlx::FromRow)]
- struct Row {
- group_name: String,
- }
- let groups: Vec<Row> =
- sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
- .bind(&contact.user_jid)
- .fetch_all(&self.db)
- .await?;
- contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
- Ok(contact)
- }
-
- pub(crate) async fn read_contact_opt(&self, contact: &JID) -> Result<Option<Contact>, Error> {
- let contact: Option<Contact> =
- sqlx::query_as("select * from roster join users on jid = user_jid where jid = ?")
- .bind(contact)
- .fetch_optional(&self.db)
- .await?;
- if let Some(mut contact) = contact {
- #[derive(sqlx::FromRow)]
- struct Row {
- group_name: String,
- }
- let groups: Vec<Row> =
- sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
- .bind(&contact.user_jid)
- .fetch_all(&self.db)
- .await?;
- contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
- Ok(Some(contact))
+ pub(crate) fn read_contact(&self, contact: BareJID) -> Result<Contact, Error> {
+ let db = &self.db;
+ let mut contact_item = db.query_row(
+ "select user_jid, name, subscription from roster where user_jid = ?1",
+ [&contact],
+ |row| {
+ Ok(Contact {
+ user_jid: row.get(0)?,
+ name: row.get(1)?,
+ subscription: row.get(2)?,
+ groups: HashSet::new(),
+ })
+ },
+ )?;
+ let groups: Result<HashSet<String>, _> = db
+ .prepare("select group_name from groups_roster where contact_jid = ?1")?
+ .query_map([&contact], |row| Ok(row.get(0)?))?
+ .collect();
+ contact_item.groups = groups?;
+ Ok(contact_item)
+ }
+
+ pub(crate) fn read_contact_opt(&self, contact: BareJID) -> Result<Option<Contact>, Error> {
+ let db = &self.db;
+ let contact_item = db
+ .query_row(
+ "select user_jid, name, subscription from roster where user_jid = ?1",
+ [&contact],
+ |row| {
+ Ok(Contact {
+ user_jid: row.get(0)?,
+ name: row.get(1)?,
+ subscription: row.get(2)?,
+ groups: HashSet::new(),
+ })
+ },
+ )
+ .optional()?;
+ if let Some(mut contact_item) = contact_item {
+ let groups: Result<HashSet<String>, _> = db
+ .prepare("select group_name from groups_roster where contact_jid = ?1")?
+ .query_map([&contact], |row| Ok(row.get(0)?))?
+ .collect();
+ contact_item.groups = groups?;
+ Ok(Some(contact_item))
} else {
Ok(None)
}
}
/// does not update the underlying user, to update user, update_user() must be called separately
- pub(crate) async fn update_contact(&self, contact: Contact) -> Result<(), Error> {
- sqlx::query!(
- "update roster set name = ?, subscription = ? where user_jid = ?",
- contact.name,
- contact.subscription,
- contact.user_jid
- )
- .execute(&self.db)
- .await?;
- sqlx::query!(
- "delete from groups_roster where contact_jid = ?",
- contact.user_jid
- )
- .execute(&self.db)
- .await?;
- // TODO: delete orphaned groups from groups table
+ pub(crate) fn update_contact(&self, contact: Contact) -> Result<(), Error> {
+ let db = &self.db;
+ db.execute(
+ "update roster set name = ?1, subscription = ?2 where user_jid = ?3",
+ (&contact.name, &contact.subscription, &contact.user_jid),
+ )?;
+ db.execute(
+ "delete from groups_roster where contact_jid = ?1",
+ [&contact.user_jid],
+ )?;
for group in contact.groups {
- sqlx::query!(
- "insert into groups (group_name) values (?) on conflict do nothing",
- group
- )
- .execute(&self.db)
- .await?;
- sqlx::query!(
- "insert into groups_roster (group_name, contact_jid) values (?, ?)",
- group,
- contact.user_jid
- )
- .execute(&self.db)
- .await?;
+ db.execute(
+ "insert into groups (group_name) values (?1) on conflict do nothing",
+ [&group],
+ )?;
+ db.execute(
+ "insert into groups_roster (group_name, contact_jid), values (?1, ?2)",
+ (&group, &contact.user_jid),
+ )?;
}
+ // TODO: delete orphaned groups from groups table, users etc.
Ok(())
}
- pub(crate) async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
- sqlx::query!(
- "insert into users ( jid ) values ( ? ) on conflict do nothing",
- contact.user_jid,
- )
- .execute(&self.db)
- .await?;
- sqlx::query!(
- "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?",
- contact.user_jid,
- contact.name,
- contact.subscription,
- contact.name,
- contact.subscription
- )
- .execute(&self.db)
- .await?;
- sqlx::query!(
- "delete from groups_roster where contact_jid = ?",
- contact.user_jid
- )
- .execute(&self.db)
- .await?;
- // TODO: delete orphaned groups from groups table
+ pub(crate) fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
+ let db = &self.db;
+ db.execute(
+ "insert into users (jid) values (?1) on conflict do nothing",
+ [&contact.user_jid],
+ )?;
+ db.execute(
+ "insert into roster ( user_jid, name, subscription ) values ( ?1, ?2, ?3 ) on conflict do update set name = ?4, subscription = ?5",
+ (&contact.user_jid, &contact.name, &contact.subscription, &contact.name, &contact.subscription),
+ )?;
+ db.execute(
+ "delete from groups_roster where contact_jid = ?1",
+ [&contact.user_jid],
+ )?;
for group in contact.groups {
- sqlx::query!(
- "insert into groups (group_name) values (?) on conflict do nothing",
- group
- )
- .execute(&self.db)
- .await?;
- sqlx::query!(
- "insert into groups_roster (group_name, contact_jid) values (?, ?)",
- group,
- contact.user_jid
- )
- .execute(&self.db)
- .await?;
+ db.execute(
+ "insert into groups (group_name) values (?1) on conflict do nothing",
+ [&group],
+ )?;
+ db.execute(
+ "insert into groups_roster (group_name, contact_jid) values (?1, ?2)",
+ (group, &contact.user_jid),
+ )?;
}
Ok(())
}
- pub(crate) async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
- sqlx::query!("delete from roster where user_jid = ?", contact)
- .execute(&self.db)
- .await?;
- // TODO: delete orphaned groups from groups table
+ pub(crate) fn delete_contact(&self, contact: BareJID) -> Result<(), Error> {
+ self.db
+ .execute("delete from roster where user_jid = ?1", [&contact])?;
Ok(())
}
- pub(crate) async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
- sqlx::query!("delete from roster").execute(&self.db).await?;
+ pub(crate) fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
+ {
+ self.db.execute("delete from roster", [])?;
+ }
for contact in roster {
- self.upsert_contact(contact).await?;
+ self.upsert_contact(contact)?;
}
Ok(())
}
- pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
- let mut roster: Vec<Contact> =
- sqlx::query_as("select * from roster join users on jid = user_jid")
- .fetch_all(&self.db)
- .await?;
+ pub(crate) fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
+ let db = &self.db;
+ let mut roster: Vec<_> = db
+ .prepare("select user_jid, name, subscription from roster")?
+ .query_map([], |row| {
+ Ok(Contact {
+ user_jid: row.get(0)?,
+ name: row.get(1)?,
+ subscription: row.get(2)?,
+ groups: HashSet::new(),
+ })
+ })?
+ .collect::<Result<Vec<_>, _>>()?;
for contact in &mut roster {
- #[derive(sqlx::FromRow)]
- struct Row {
- group_name: String,
- }
- let groups: Vec<Row> =
- sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
- .bind(&contact.user_jid)
- .fetch_all(&self.db)
- .await?;
- contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ let groups: Result<HashSet<String>, _> = db
+ .prepare("select group_name from groups_roster where contact_jid = ?1")?
+ .query_map([&contact.user_jid], |row| Ok(row.get(0)?))?
+ .collect();
+ contact.groups = groups?;
}
Ok(roster)
}
- pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
+ pub(crate) fn read_cached_roster_with_users(&self) -> Result<Vec<(Contact, User)>, Error> {
+ let db = &self.db;
+ let mut roster: Vec<(Contact, User)> = db.prepare("select user_jid, name, subscription, jid, nick, avatar from roster join users on jid = user_jid")?.query_map([], |row| {
+ Ok((
+ Contact {
+ user_jid: row.get(0)?,
+ name: row.get(1)?,
+ subscription: row.get(2)?,
+ groups: HashSet::new(),
+ },
+ User {
+ jid: row.get(3)?,
+ nick: row.get(4)?,
+ avatar: row.get(5)?,
+ }
+ ))
+ })?.collect::<Result<Vec<_>, _>>()?;
+ for (contact, _) in &mut roster {
+ let groups: Result<HashSet<String>, _> = db
+ .prepare("select group_name from groups_roster where contact_jid = ?1")?
+ .query_map([&contact.user_jid], |row| Ok(row.get(0)?))?
+ .collect();
+ contact.groups = groups?;
+ }
+ Ok(roster)
+ }
+
+ pub(crate) fn create_chat(&self, chat: Chat) -> Result<(), Error> {
let id = Uuid::new_v4();
let jid = chat.correspondent();
- sqlx::query!(
- "insert into chats (id, correspondent, have_chatted) values (?, ?, ?)",
- id,
- jid,
- chat.have_chatted,
- )
- .execute(&self.db)
- .await?;
+ self.db.execute(
+ "insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3)",
+ (id, jid, chat.have_chatted),
+ )?;
Ok(())
}
// TODO: what happens if a correspondent changes from a user to a contact? maybe just have correspondent be a user, then have the client make the user show up as a contact in ui if they are in the loaded roster.
- pub(crate) async fn read_chat(&self, chat: JID) -> Result<Chat, Error> {
- // check if the chat correponding with the jid exists
- let chat: Chat = sqlx::query_as("select correspondent from chats where correspondent = ?")
- .bind(chat)
- .fetch_one(&self.db)
- .await?;
- Ok(chat)
+ /// TODO: this is NOT a read
+ pub(crate) fn read_chat(&self, chat: BareJID) -> Result<Chat, Error> {
+ let chat_opt = self
+ .db
+ .query_row(
+ "select correspondent, have_chatted from chats where correspondent = ?1",
+ [&chat],
+ |row| {
+ Ok(Chat {
+ correspondent: row.get(0)?,
+ have_chatted: row.get(1)?,
+ })
+ },
+ )
+ .optional()?;
+ match chat_opt {
+ Some(chat) => return Ok(chat),
+ None => {
+ let chat = Chat {
+ correspondent: chat,
+ have_chatted: false,
+ };
+ self.create_chat(chat.clone())?;
+ Ok(chat)
+ }
+ }
+ }
+
+ pub(crate) fn read_chat_and_user(&self, chat: BareJID) -> Result<(Chat, User), Error> {
+ let user = self.read_user(chat.clone())?;
+ let chat_opt = self.db.query_row(
+ "select correspondent, have_chatted, jid, nick, avatar from chats join users on correspondent = jid where correspondent = ?1",
+ [&chat],
+ |row| {
+ Ok((
+ Chat {
+ correspondent: row.get(0)?,
+ have_chatted: row.get(1)?,
+ },
+ User {
+ jid: row.get(2)?,
+ nick: row.get(3)?,
+ avatar: row.get(4)?,
+ }
+ ))
+ },
+ ).optional()?;
+ match chat_opt {
+ Some(chat) => return Ok(chat),
+ None => {
+ let chat = Chat {
+ correspondent: chat,
+ have_chatted: false,
+ };
+ self.create_chat(chat.clone())?;
+ Ok((chat, user))
+ }
+ }
}
- pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> {
- let jid = chat.as_bare();
- sqlx::query!(
- "update chats set have_chatted = true where correspondent = ?",
- jid
- )
- .execute(&self.db)
- .await?;
+ pub(crate) fn mark_chat_as_chatted(&self, chat: BareJID) -> Result<(), Error> {
+ self.db.execute(
+ "update chats set have_chatted = true where correspondent = ?1",
+ [chat],
+ )?;
Ok(())
}
- pub(crate) async fn update_chat_correspondent(
+ pub(crate) fn update_chat_correspondent(
&self,
old_chat: Chat,
- new_correspondent: JID,
+ new_correspondent: BareJID,
) -> Result<Chat, Error> {
- // TODO: update other chat data if it differs (for now there is only correspondent so doesn't matter)
let new_jid = &new_correspondent;
let old_jid = old_chat.correspondent();
- sqlx::query!(
- "update chats set correspondent = ? where correspondent = ?",
- new_jid,
- old_jid,
- )
- .execute(&self.db)
- .await?;
- let chat = self.read_chat(new_correspondent).await?;
+ let chat = self.db.query_row(
+ "update chats set correspondent = ?1 where correspondent = ?2 returning correspondent, have_chatted",
+ [new_jid, old_jid],
+ |row| Ok(Chat {
+ correspondent: row.get(0)?,
+ have_chatted: row.get(1)?,
+ })
+ )?;
Ok(chat)
}
- // pub(crate) async fn update_chat
+ // pub(crate) fn update_chat
- pub(crate) async fn delete_chat(&self, chat: JID) -> Result<(), Error> {
- sqlx::query!("delete from chats where correspondent = ?", chat)
- .execute(&self.db)
- .await?;
+ pub(crate) fn delete_chat(&self, chat: BareJID) -> Result<(), Error> {
+ self.db
+ .execute("delete from chats where correspondent = ?1", [chat])?;
Ok(())
}
/// TODO: sorting and filtering (for now there is no sorting)
- pub(crate) async fn read_chats(&self) -> Result<Vec<Chat>, Error> {
- let chats: Vec<Chat> = sqlx::query_as("select * from chats")
- .fetch_all(&self.db)
- .await?;
+ pub(crate) fn read_chats(&self) -> Result<Vec<Chat>, Error> {
+ let chats = self
+ .db
+ .prepare("select correspondent, have_chatted from chats")?
+ .query_map([], |row| {
+ Ok(Chat {
+ correspondent: row.get(0)?,
+ have_chatted: row.get(1)?,
+ })
+ })?
+ .collect::<Result<Vec<_>, _>>()?;
Ok(chats)
}
/// chats ordered by date of last message
// greatest-n-per-group
- pub(crate) async fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
- let chats = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
- .fetch_all(&self.db)
- .await?;
+ pub(crate) fn read_chats_ordered(&self) -> Result<Vec<Chat>, Error> {
+ let chats = self
+ .db
+ .prepare("select c.correspondent, c.have_chatted, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")?
+ .query_map([], |row| {
+ Ok(Chat {
+ correspondent: row.get(0)?,
+ have_chatted: row.get(1)?,
+ })
+ })?
+ .collect::<Result<Vec<_>, _>>()?;
Ok(chats)
}
/// chats ordered by date of last message
// greatest-n-per-group
- pub(crate) async fn read_chats_ordered_with_latest_messages(
+ pub(crate) fn read_chats_ordered_with_latest_messages(
&self,
) -> Result<Vec<(Chat, Message)>, Error> {
- #[derive(sqlx::FromRow)]
- pub struct ChatWithMessage {
- #[sqlx(flatten)]
- pub chat: Chat,
- #[sqlx(flatten)]
- pub message: Message,
- }
-
- // TODO: sometimes chats have no messages.
- // TODO: i don't know if this will assign the right uuid to the latest message or the chat's id. should probably check but i don't think it matters as nothing ever gets called with the id of the latest message in the chats list
- // TODO: it does matter in fact, as message updates and delivery receipts need to go to the right latest_message
- let chats: Vec<ChatWithMessage> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
- .fetch_all(&self.db)
- .await?;
-
- let chats = chats
- .into_iter()
- .map(|chat_with_message| (chat_with_message.chat, chat_with_message.message))
- .collect();
+ let chats = self
+ .db
+ .prepare("select c.correspondent, c.have_chatted, m.id, m.from_jid, m.delivery, m.timestamp, m.body from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")?
+ .query_map([], |row| {
+ Ok((
+ Chat {
+ correspondent: row.get(0)?,
+ have_chatted: row.get(1)?,
+ },
+ Message {
+ id: row.get(2)?,
+ from: row.get(3)?,
+ delivery: row.get(4)?,
+ timestamp: row.get(5)?,
+ body: Body {
+ body: row.get(6)?,
+ },
+ }
+ ))
+ })?
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(chats)
+ }
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) fn read_chats_ordered_with_latest_messages_and_users(
+ &self,
+ ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
+ let chats = self
+ .db
+ .prepare("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc")?
+ .query_map([], |row| {
+ Ok((
+ (
+ Chat {
+ correspondent: row.get("chat_correspondent")?,
+ have_chatted: row.get("chat_have_chatted")?,
+ },
+ User {
+ jid: row.get("chat_user_jid")?,
+ nick: row.get("chat_user_nick")?,
+ avatar: row.get("chat_user_avatar")?,
+ }
+ ),
+ (
+ Message {
+ id: row.get("message_id")?,
+ from: row.get("message_from_jid")?,
+ delivery: row.get("message_delivery")?,
+ timestamp: row.get("message_timestamp")?,
+ body: Body {
+ body: row.get("message_body")?,
+ },
+ },
+ User {
+ jid: row.get("message_user_jid")?,
+ nick: row.get("message_user_nick")?,
+ avatar: row.get("message_user_avatar")?,
+ }
+ ),
+ ))
+ })?
+ .collect::<Result<Vec<_>, _>>()?;
Ok(chats)
}
- async fn read_chat_id(&self, chat: JID) -> Result<Uuid, Error> {
- #[derive(sqlx::FromRow)]
- struct Row {
- id: Uuid,
- }
- let chat = chat.as_bare();
- let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?")
- .bind(chat)
- .fetch_one(&self.db)
- .await?;
- let chat_id = chat_id.id;
+ #[tracing::instrument]
+ fn read_chat_id(&self, chat: BareJID) -> Result<Uuid, Error> {
+ let chat_id = self.db.query_row(
+ "select id from chats where correspondent = ?1",
+ [chat],
+ |row| Ok(row.get(0)?),
+ )?;
Ok(chat_id)
}
- async fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> {
- #[derive(sqlx::FromRow)]
- struct Row {
- id: Uuid,
- }
- let chat_id: Option<Row> = sqlx::query_as("select id from chats where correspondent = ?")
- .bind(chat)
- .fetch_optional(&self.db)
- .await?;
- let chat_id = chat_id.map(|row| row.id);
+ fn read_chat_id_opt(&self, chat: JID) -> Result<Option<Uuid>, Error> {
+ let chat_id = self
+ .db
+ .query_row(
+ "select id from chats where correspondent = ?1",
+ [chat],
+ |row| Ok(row.get(0)?),
+ )
+ .optional()?;
Ok(chat_id)
}
/// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
- pub(crate) async fn create_message(
+ #[tracing::instrument]
+ pub(crate) fn create_message(
&self,
message: Message,
- chat: JID,
- from: JID,
+ chat: BareJID,
+ from: FullJID,
) -> Result<(), Error> {
- // TODO: one query
let from_jid = from.as_bare();
- let chat_id = self.read_chat_id(chat).await?;
- sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, from_jid, from.resourcepart, message.timestamp).execute(&self.db).await?;
+ let chat_id = self.read_chat_id(chat)?;
+ tracing::debug!("creating message");
+ self.db.execute("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp, delivery) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)", (&message.id, &message.body.body, &chat_id, &from_jid, &from.resourcepart, &message.timestamp, &message.delivery))?;
Ok(())
}
- pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> {
- let bare_chat = chat.as_bare();
- sqlx::query!(
- "insert into users (jid) values (?) on conflict do nothing",
- bare_chat,
- )
- .execute(&self.db)
- .await?;
+ pub(crate) fn upsert_chat_and_user(&self, chat: &BareJID) -> Result<bool, Error> {
+ let db = &self.db;
+ db.execute(
+ "insert into users (jid) values (?1) on conflict do nothing",
+ [&chat],
+ )?;
let id = Uuid::new_v4();
- let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing; select * from chats where correspondent = ?")
- .bind(id)
- .bind(bare_chat.clone())
- .bind(false)
- .bind(bare_chat)
- .fetch_one(&self.db)
- .await?;
- tracing::debug!("CHECKING chat: {:?}", chat);
+ db.execute("insert into chats (id, correspondent, have_chatted) values (?1, ?2, ?3) on conflict do nothing", (id, &chat, false))?;
+ let chat = db.query_row(
+ "select correspondent, have_chatted from chats where correspondent = ?1",
+ [&chat],
+ |row| {
+ Ok(Chat {
+ correspondent: row.get(0)?,
+ have_chatted: row.get(1)?,
+ })
+ },
+ )?;
Ok(chat.have_chatted)
}
- /// MUST upsert chat beforehand
- pub(crate) async fn create_message_with_self_resource(
+ /// create direct message from incoming. MUST upsert chat and user
+ #[tracing::instrument]
+ pub(crate) fn create_message_with_user_resource(
&self,
message: Message,
- chat: JID,
- // full jid
- from: JID,
+ chat: BareJID,
+ from: FullJID,
) -> Result<(), Error> {
let from_jid = from.as_bare();
- if let Some(resource) = &from.resourcepart {
- sqlx::query!(
- "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
- from_jid,
- resource
- )
- .execute(&self.db)
- .await?;
- }
- self.create_message(message, chat, from).await?;
+ tracing::debug!("creating resource");
+ self.db.execute(
+ "insert into resources (bare_jid, resource) values (?1, ?2) on conflict do nothing",
+ (&from_jid, &from.resourcepart),
+ )?;
+ self.create_message(message, chat, from)?;
Ok(())
}
- /// create direct message from incoming. MUST upsert chat and user
- pub(crate) async fn create_message_with_user_resource(
+ pub(crate) fn update_message_delivery(
&self,
- message: Message,
- chat: JID,
- // full jid
- from: JID,
+ message: Uuid,
+ delivery: Delivery,
) -> Result<(), Error> {
- let bare_chat = chat.as_bare();
- let resource = &chat.resourcepart;
- // sqlx::query!(
- // "insert into users (jid) values (?) on conflict do nothing",
- // bare_chat
- // )
- // .execute(&self.db)
- // .await?;
- // let id = Uuid::new_v4();
- // sqlx::query!(
- // "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
- // id,
- // bare_chat
- // )
- // .execute(&self.db)
- // .await?;
- if let Some(resource) = resource {
- sqlx::query!(
- "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
- bare_chat,
- resource
- )
- .execute(&self.db)
- .await?;
- }
- self.create_message(message, chat, from).await?;
+ self.db.execute(
+ "update messages set delivery = ?1 where id = ?2",
+ (delivery, message),
+ )?;
Ok(())
}
- pub(crate) async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
- let message: Message = sqlx::query_as("select * from messages where id = ?")
- .bind(message)
- .fetch_one(&self.db)
- .await?;
- Ok(message)
- }
+ // pub(crate) fn read_message(&self, message: Uuid) -> Result<Message, Error> {
+ // Ok(Message {
+ // id: Uuid,
+ // from: todo!(),
+ // delivery: todo!(),
+ // timestamp: todo!(),
+ // body: todo!(),
+ // })
+ // }
- // TODO: message updates/edits pub(crate) async fn update_message(&self, message: Message) -> Result<(), Error> {}
+ // TODO: message updates/edits pub(crate) fn update_message(&self, message: Message) -> Result<(), Error> {}
- pub(crate) async fn delete_message(&self, message: Uuid) -> Result<(), Error> {
- sqlx::query!("delete from messages where id = ?", message)
- .execute(&self.db)
- .await?;
+ pub(crate) fn delete_message(&self, message: Uuid) -> Result<(), Error> {
+ self.db
+ .execute("delete from messages where id = ?1", [message])?;
Ok(())
}
+ pub(crate) fn read_message(&self, message: Uuid) -> Result<Message, Error> {
+ let message = self.db.query_row(
+ "select id, from_jid, delivery, timestamp, body from messages where id = ?1",
+ [&message],
+ |row| {
+ Ok(Message {
+ id: row.get(0)?,
+ // TODO: full from
+ from: row.get(1)?,
+ delivery: row.get(2)?,
+ timestamp: row.get(3)?,
+ body: Body { body: row.get(4)? },
+ })
+ },
+ )?;
+ Ok(message)
+ }
+
// TODO: paging
- pub(crate) async fn read_message_history(&self, chat: JID) -> Result<Vec<Message>, Error> {
- let chat_id = self.read_chat_id(chat).await?;
- let messages: Vec<Message> =
- sqlx::query_as("select * from messages where chat_id = ? order by timestamp asc")
- .bind(chat_id)
- .fetch_all(&self.db)
- .await?;
+ pub(crate) fn read_message_history(&self, chat: BareJID) -> Result<Vec<Message>, Error> {
+ let chat_id = self.read_chat_id(chat)?;
+ let messages = self
+ .db
+ .prepare(
+ "select id, from_jid, delivery, timestamp, body from messages where chat_id = ?1",
+ )?
+ .query_map([chat_id], |row| {
+ Ok(Message {
+ id: row.get(0)?,
+ // TODO: full from
+ from: row.get(1)?,
+ delivery: row.get(2)?,
+ timestamp: row.get(3)?,
+ body: Body { body: row.get(4)? },
+ })
+ })?
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(messages)
+ }
+
+ pub(crate) fn read_message_history_with_users(
+ &self,
+ chat: BareJID,
+ ) -> Result<Vec<(Message, User)>, Error> {
+ let chat_id = self.read_chat_id(chat)?;
+ let messages = self
+ .db
+ .prepare(
+ "select id, from_jid, delivery, timestamp, body, jid, nick, avatar from messages join users on jid = from_jid where chat_id = ? order by timestamp asc",
+ )?
+ .query_map([chat_id], |row| {
+ Ok((
+ Message {
+ id: row.get(0)?,
+ // TODO: full from
+ from: row.get(1)?,
+ delivery: row.get(2)?,
+ timestamp: row.get(3)?,
+ body: Body { body: row.get(4)? },
+ },
+ User {
+ jid: row.get(5)?,
+ nick: row.get(6)?,
+ avatar: row.get(7)?,
+ }
+ ))
+ })?
+ .collect::<Result<Vec<_>, _>>()?;
Ok(messages)
}
- pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
- let online: Online = sqlx::query_as("select * from cached_status where id = 0")
- .fetch_one(&self.db)
- .await?;
- Ok(online)
+ pub(crate) fn read_cached_status(&self) -> Result<Online, Error> {
+ let status = self.db.query_row(
+ "select show, message from cached_status where id = 0",
+ [],
+ |row| {
+ Ok(Online {
+ show: row.get(0)?,
+ status: row.get(1)?,
+ priority: None,
+ })
+ },
+ )?;
+ Ok(status)
}
- pub(crate) async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
- sqlx::query!(
- "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?",
- status.show,
- status.status,
- status.show,
- status.status
- ).execute(&self.db).await?;
+ pub(crate) fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
+ self.db.execute("insert into cached_status (id, show, message) values (0, ?1, ?2) on conflict do update set show = ?3, message = ?4", (status.show, &status.status, status.show, &status.status))?;
Ok(())
}
- pub(crate) async fn delete_cached_status(&self) -> Result<(), Error> {
- sqlx::query!("update cached_status set show = null, message = null where id = 0")
- .execute(&self.db)
- .await?;
+ pub(crate) fn delete_cached_status(&self) -> Result<(), Error> {
+ self.db.execute(
+ "update cached_status set show = null, message = null where id = 0",
+ [],
+ )?;
Ok(())
}
- pub(crate) async fn read_capabilities(&self, node: &str) -> Result<String, Error> {
- #[derive(sqlx::FromRow)]
- struct Row {
- capabilities: String,
- }
- let row: Row =
- sqlx::query_as("select capabilities from capability_hash_nodes where node = ?")
- .bind(node)
- .fetch_one(&self.db)
- .await?;
- Ok(row.capabilities)
+ pub(crate) fn read_capabilities(&self, node: String) -> Result<String, Error> {
+ let capabilities = self.db.query_row(
+ "select capabilities from capability_hash_nodes where node = ?1",
+ [node],
+ |row| Ok(row.get(0)?),
+ )?;
+ Ok(capabilities)
}
- pub(crate) async fn upsert_capabilities(
+ pub(crate) fn upsert_capabilities(
&self,
- node: &str,
- capabilities: &str,
+ node: String,
+ capabilities: String,
) -> Result<(), Error> {
let now = Utc::now();
- sqlx::query!(
- "insert into capability_hash_nodes (node, timestamp, capabilities) values (?, ?, ?) on conflict do update set timestamp = ?, capabilities = ?", node, now, capabilities, now, capabilities
- ).execute(&self.db).await?;
+ self.db.execute("insert into capability_hash_nodes (node, timestamp, capabilities) values (?1, ?2, ?3) on conflict do update set timestamp = ?, capabilities = ?", (node, now, &capabilities, now, &capabilities))?;
Ok(())
}
}
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
index 23272b1..fb7d778 100644
--- a/filamento/src/error.rs
+++ b/filamento/src/error.rs
@@ -3,12 +3,17 @@ use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc};
use base64::DecodeError;
use image::ImageError;
use jid::JID;
-use lampada::error::{ConnectionError, ReadError, WriteError};
+use jid::JIDError;
+use lampada::error::{ActorError, ReadError, WriteError};
use stanza::client::{Stanza, iq::Query};
use thiserror::Error;
pub use lampada::error::CommandError;
+pub use lampada::error::ConnectionError;
+use tokio::sync::mpsc::error::SendError;
+use tokio::sync::oneshot::error::RecvError;
+use crate::db::DbCommand;
use crate::files::FileStore;
// for the client logic impl
@@ -21,7 +26,7 @@ pub enum Error<Fs: FileStore> {
// TODO: include content
// UnrecognizedContent(peanuts::element::Content),
#[error("iq receive error: {0}")]
- Iq(#[from] IqError),
+ Iq(#[from] IqProcessError),
// TODO: change to Connecting(ConnectingError)
#[error("connecting: {0}")]
Connecting(#[from] ConnectionJobError),
@@ -117,6 +122,17 @@ pub enum RosterError {
StanzaError(#[from] stanza::client::error::Error),
#[error("could not reply to roster push: {0}")]
PushReply(WriteError),
+ #[error("actor error: {0}")]
+ Actor(ActorError),
+}
+
+impl From<CommandError<RosterError>> for RosterError {
+ fn from(value: CommandError<RosterError>) -> Self {
+ match value {
+ CommandError::Actor(actor_error) => Self::Actor(actor_error),
+ CommandError::Error(e) => e,
+ }
+ }
}
#[derive(Debug, Error, Clone)]
@@ -154,17 +170,104 @@ pub enum ResponseError {
}
#[derive(Debug, Error, Clone)]
-#[error("database error: {0}")]
-pub struct DatabaseError(pub Arc<sqlx::Error>);
+pub enum DatabaseError {
+ #[error("database error: {0}")]
+ Database(Serializeable<Arc<rusqlite::Error>>),
+ #[error("database command send: {0}")]
+ Send(Arc<SendError<DbCommand>>),
+ #[error("database result recv: {0}")]
+ Recv(#[from] RecvError),
+}
+
+impl From<SendError<DbCommand>> for DatabaseError {
+ fn from(e: SendError<DbCommand>) -> Self {
+ Self::Send(Arc::new(e))
+ }
+}
+
+pub enum Serializeable<T> {
+ String(String),
+ Unserialized(T),
+}
+
+impl<T: std::fmt::Display> std::fmt::Display for Serializeable<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match &self {
+ Serializeable::String(s) => s.fmt(f),
+ Serializeable::Unserialized(t) => t.fmt(f),
+ }
+ }
+}
+
+impl<T: std::fmt::Debug> std::fmt::Debug for Serializeable<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match &self {
+ Serializeable::String(s) => s.fmt(f),
+ Serializeable::Unserialized(t) => t.fmt(f),
+ }
+ }
+}
+
+impl<T: Clone> Clone for Serializeable<T> {
+ fn clone(&self) -> Self {
+ match self {
+ Serializeable::String(s) => Self::String(s.clone()),
+ Serializeable::Unserialized(t) => Self::Unserialized(t.clone()),
+ }
+ }
+}
+
+#[cfg(feature = "serde")]
+struct StringVisitor;
+
+#[cfg(feature = "serde")]
+impl<'de> serde::de::Visitor<'de> for StringVisitor {
+ type Value = String;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("a string")
+ }
+
+ fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(v)
+ }
+}
-impl From<sqlx::Error> for DatabaseError {
- fn from(e: sqlx::Error) -> Self {
- Self(Arc::new(e))
+#[cfg(feature = "serde")]
+impl<'de> serde::Deserialize<'de> for DatabaseError {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ let string = deserializer.deserialize_string(StringVisitor)?;
+ Ok(Self::Database(Serializeable::String(string)))
}
}
-impl From<sqlx::Error> for DatabaseOpenError {
- fn from(e: sqlx::Error) -> Self {
+#[cfg(feature = "serde")]
+impl serde::Serialize for DatabaseError {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ match &self.0 {
+ Serializeable::String(s) => serializer.serialize_str(s),
+ Serializeable::Unserialized(u) => serializer.serialize_str(&u.to_string()),
+ }
+ }
+}
+
+impl From<rusqlite::Error> for DatabaseError {
+ fn from(e: rusqlite::Error) -> Self {
+ Self::Database(Serializeable::Unserialized(Arc::new(e)))
+ }
+}
+
+impl From<rusqlite::Error> for DatabaseOpenError {
+ fn from(e: rusqlite::Error) -> Self {
Self::Error(Arc::new(e))
}
}
@@ -181,25 +284,149 @@ pub enum IqError {
}
#[derive(Debug, Error, Clone)]
+pub enum IqProcessError {
+ #[error("iq error")]
+ Iq(#[from] IqError),
+ #[error("roster push")]
+ Roster(#[from] RosterError),
+}
+
+#[derive(Debug, Error, Clone)]
pub enum DatabaseOpenError {
+ #[cfg(target_arch = "wasm32")]
+ #[error("opfs: {0}")]
+ OpfsSAH(#[from] OpfsSAHError),
#[error("error: {0}")]
- Error(Arc<sqlx::Error>),
- #[error("migration: {0}")]
- Migration(Arc<sqlx::migrate::MigrateError>),
+ Error(Arc<rusqlite::Error>),
+ // #[error("migration: {0}")]
+ // Migration(Arc<rusqlite::migrate::MigrateError>),
#[error("io: {0}")]
- Io(Arc<tokio::io::Error>),
+ Io(Arc<std::io::Error>),
#[error("invalid path")]
InvalidPath,
+ #[error("tokio oneshot recv error: {0}")]
+ Recv(#[from] tokio::sync::oneshot::error::RecvError),
}
-impl From<sqlx::migrate::MigrateError> for DatabaseOpenError {
- fn from(e: sqlx::migrate::MigrateError) -> Self {
- Self::Migration(Arc::new(e))
+// impl From<sqlx::migrate::MigrateError> for DatabaseOpenError {
+// fn from(e: sqlx::migrate::MigrateError) -> Self {
+// Self::Migration(Arc::new(e))
+// }
+// }
+
+#[cfg(target_arch = "wasm32")]
+impl From<rusqlite::ffi::OpfsSAHError> for OpfsSAHError {
+ fn from(e: rusqlite::ffi::OpfsSAHError) -> Self {
+ use wasm_bindgen::UnwrapThrowExt;
+ match e {
+ rusqlite::ffi::OpfsSAHError::Vfs(_register_vfs_error) => Self::VfsRegistration,
+ rusqlite::ffi::OpfsSAHError::ImportDb(_import_db_error) => Self::ImportDb,
+ rusqlite::ffi::OpfsSAHError::NotSuported => Self::NotSupported,
+ rusqlite::ffi::OpfsSAHError::GetDirHandle(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::GetDirHandle(message)
+ }
+ rusqlite::ffi::OpfsSAHError::GetFileHandle(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::GetFileHandle(message)
+ }
+ rusqlite::ffi::OpfsSAHError::CreateSyncAccessHandle(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::CreateSyncAccessHandle(message)
+ }
+ rusqlite::ffi::OpfsSAHError::IterHandle(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::IterHandle(message)
+ }
+ rusqlite::ffi::OpfsSAHError::GetPath(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::GetPath(message)
+ }
+ rusqlite::ffi::OpfsSAHError::RemoveEntity(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::RemoveEntity(message)
+ }
+ rusqlite::ffi::OpfsSAHError::GetSize(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::GetSize(message)
+ }
+ rusqlite::ffi::OpfsSAHError::Read(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::Read(message)
+ }
+ rusqlite::ffi::OpfsSAHError::Write(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::Write(message)
+ }
+ rusqlite::ffi::OpfsSAHError::Flush(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::Flush(message)
+ }
+ rusqlite::ffi::OpfsSAHError::Truncate(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::Truncate(message)
+ }
+ rusqlite::ffi::OpfsSAHError::Reflect(js_value) => {
+ let error: js_sys::Error = js_value.into();
+ let message = error.message().as_string().unwrap_throw();
+ Self::Reflect(message)
+ }
+ rusqlite::ffi::OpfsSAHError::Generic(s) => Self::Generic(s),
+ rusqlite::ffi::OpfsSAHError::Custom(s) => Self::Generic(s),
+ }
}
}
-impl From<tokio::io::Error> for DatabaseOpenError {
- fn from(e: tokio::io::Error) -> Self {
+#[cfg(target_arch = "wasm32")]
+#[derive(Debug, Error, Clone)]
+pub enum OpfsSAHError {
+ #[error("VFS registration")]
+ VfsRegistration,
+ #[error("import db error")]
+ ImportDb,
+ #[error("not supported")]
+ NotSupported,
+ #[error("get dir handle: {0}")]
+ GetDirHandle(String),
+ #[error("get file handle: {0}")]
+ GetFileHandle(String),
+ #[error("create sync access handle: {0}")]
+ CreateSyncAccessHandle(String),
+ #[error("iter handle: {0}")]
+ IterHandle(String),
+ #[error("get path: {0}")]
+ GetPath(String),
+ #[error("remove entity: {0}")]
+ RemoveEntity(String),
+ #[error("get size: {0}")]
+ GetSize(String),
+ #[error("read: {0}")]
+ Read(String),
+ #[error("write: {0}")]
+ Write(String),
+ #[error("flush: {0}")]
+ Flush(String),
+ #[error("truncate: {0}")]
+ Truncate(String),
+ #[error("reflect: {0}")]
+ Reflect(String),
+ #[error("generic: {0}")]
+ Generic(String),
+}
+
+impl From<std::io::Error> for DatabaseOpenError {
+ fn from(e: std::io::Error) -> Self {
Self::Io(Arc::new(e))
}
}
@@ -218,8 +445,10 @@ pub enum PresenceError {
Unsupported,
#[error("missing from")]
MissingFrom,
- #[error("stanza error: {0}")]
- StanzaError(#[from] stanza::client::error::Error),
+ #[error("stanza error: {0:?}")]
+ StanzaError(Option<stanza::client::error::Error>),
+ #[error("received subscription request from a non-bare jid")]
+ InvalidSubscriptionRequest(#[from] JIDError),
}
#[derive(Debug, Error, Clone)]
diff --git a/filamento/src/files.rs b/filamento/src/files.rs
index cd232f3..dcc9cd2 100644
--- a/filamento/src/files.rs
+++ b/filamento/src/files.rs
@@ -1,11 +1,15 @@
use std::{
+ collections::HashMap,
+ convert::Infallible,
error::Error,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::io;
+use tokio::sync::Mutex;
+#[cfg(not(target_arch = "wasm32"))]
pub trait FileStore {
type Err: Clone + Send + Error;
@@ -24,19 +28,85 @@ pub trait FileStore {
) -> impl std::future::Future<Output = Result<(), Self::Err>> + std::marker::Send;
}
+#[cfg(target_arch = "wasm32")]
+pub trait FileStore {
+ type Err: Clone + Send + Error;
+
+ fn is_stored(&self, name: &str) -> impl std::future::Future<Output = Result<bool, Self::Err>>;
+ fn store(
+ &self,
+ name: &str,
+ data: &[u8],
+ ) -> impl std::future::Future<Output = Result<(), Self::Err>>;
+ fn delete(&self, name: &str) -> impl std::future::Future<Output = Result<(), Self::Err>>;
+}
+
+#[derive(Clone, Debug)]
+pub struct FilesMem {
+ files: Arc<Mutex<HashMap<String, Vec<u8>>>>,
+}
+
+impl FilesMem {
+ pub fn new() -> Self {
+ Self {
+ files: Arc::new(Mutex::new(HashMap::new())),
+ }
+ }
+
+ pub async fn get_file(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
+ let name = name.as_ref();
+ self.files.lock().await.get(name).cloned()
+ }
+}
+
+#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
+pub mod opfs;
+
+#[cfg(all(feature = "opfs", target_arch = "wasm32"))]
+pub use opfs::FilesOPFS;
+
+impl FileStore for FilesMem {
+ type Err = Infallible;
+
+ async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> {
+ Ok(self.files.lock().await.contains_key(name))
+ }
+
+ async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> {
+ self.files
+ .lock()
+ .await
+ .insert(name.to_string(), data.to_owned());
+
+ Ok(())
+ }
+
+ async fn delete(&self, name: &str) -> Result<(), Self::Err> {
+ self.files.lock().await.remove(name);
+ Ok(())
+ }
+}
+
+#[cfg(not(target_arch = "wasm32"))]
#[derive(Clone, Debug)]
pub struct Files {
root: PathBuf,
}
+#[cfg(not(target_arch = "wasm32"))]
impl Files {
pub fn new(root: impl AsRef<Path>) -> Self {
let root = root.as_ref();
let root = root.into();
Self { root }
}
+
+ pub fn root(&self) -> &Path {
+ &self.root
+ }
}
+#[cfg(not(target_arch = "wasm32"))]
impl FileStore for Files {
type Err = Arc<io::Error>;
diff --git a/filamento/src/files/opfs.rs b/filamento/src/files/opfs.rs
new file mode 100644
index 0000000..0bcce35
--- /dev/null
+++ b/filamento/src/files/opfs.rs
@@ -0,0 +1,128 @@
+use std::path::Path;
+
+use thiserror::Error;
+use wasm_bindgen::{JsCast, JsValue};
+use wasm_bindgen_futures::JsFuture;
+use web_sys::{
+ File, FileSystemDirectoryHandle, FileSystemFileHandle, FileSystemGetDirectoryOptions,
+ FileSystemGetFileOptions, FileSystemWritableFileStream, Url, js_sys, window,
+};
+
+use crate::FileStore;
+
+#[derive(Clone, Debug)]
+pub struct FilesOPFS {
+ directory: String,
+}
+
+impl FilesOPFS {
+ pub async fn new(directory: impl AsRef<str>) -> Result<Self, OPFSError> {
+ let directory = directory.as_ref();
+ let directory_string = directory.to_string();
+ let promise = window().unwrap().navigator().storage().get_directory();
+ let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into();
+ let options = FileSystemGetDirectoryOptions::new();
+ options.set_create(true);
+ let directory: FileSystemDirectoryHandle =
+ JsFuture::from(opfs_root.get_directory_handle_with_options(directory, &options))
+ .await?
+ .into();
+ Ok(Self {
+ directory: directory_string,
+ })
+ }
+
+ pub async fn get_src(&self, file_name: impl AsRef<str>) -> Result<String, OPFSError> {
+ let promise = window().unwrap().navigator().storage().get_directory();
+ let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into();
+ let directory: FileSystemDirectoryHandle =
+ JsFuture::from(opfs_root.get_directory_handle(&self.directory))
+ .await?
+ .into();
+ let handle: FileSystemFileHandle =
+ JsFuture::from(directory.get_file_handle(file_name.as_ref()))
+ .await?
+ .into();
+ let file: File = JsFuture::from(handle.get_file()).await?.into();
+ let src = Url::create_object_url_with_blob(&file)?;
+ Ok(src)
+ }
+}
+
+impl FileStore for FilesOPFS {
+ type Err = OPFSError;
+
+ async fn is_stored(&self, name: &str) -> Result<bool, Self::Err> {
+ let promise = window().unwrap().navigator().storage().get_directory();
+ let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into();
+ let directory: FileSystemDirectoryHandle =
+ JsFuture::from(opfs_root.get_directory_handle(&self.directory))
+ .await?
+ .into();
+ let stored = JsFuture::from(directory.get_file_handle(name))
+ .await
+ .map(|_| true)
+ // TODO: distinguish between other errors and notfound
+ .unwrap_or(false);
+ Ok(stored)
+ }
+
+ async fn store(&self, name: &str, data: &[u8]) -> Result<(), Self::Err> {
+ let promise = window().unwrap().navigator().storage().get_directory();
+ let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into();
+ let directory: FileSystemDirectoryHandle =
+ JsFuture::from(opfs_root.get_directory_handle(&self.directory))
+ .await?
+ .into();
+ let options = FileSystemGetFileOptions::new();
+ options.set_create(true);
+ let handle: FileSystemFileHandle =
+ JsFuture::from(directory.get_file_handle_with_options(name, &options))
+ .await?
+ .into();
+ let write_handle: FileSystemWritableFileStream =
+ JsFuture::from(handle.create_writable()).await?.into();
+
+ let buffer = js_sys::ArrayBuffer::new(data.len() as u32);
+ let u8arr = js_sys::Uint8Array::new(&buffer);
+ for (idx, v) in data.iter().enumerate() {
+ u8arr.set_index(idx as u32, *v);
+ }
+
+ let write_promise = write_handle.write_with_js_u8_array(&u8arr).unwrap();
+ // let write_promise = write_handle.write_with_u8_array(data)?;
+ let _ = JsFuture::from(write_promise).await?;
+ let _ = JsFuture::from(write_handle.close()).await?;
+ Ok(())
+ }
+
+ async fn delete(&self, name: &str) -> Result<(), Self::Err> {
+ let promise = window().unwrap().navigator().storage().get_directory();
+ let opfs_root: FileSystemDirectoryHandle = JsFuture::from(promise).await?.into();
+ let directory: FileSystemDirectoryHandle =
+ JsFuture::from(opfs_root.get_directory_handle(&self.directory))
+ .await?
+ .into();
+ let _ = JsFuture::from(directory.remove_entry(name)).await?;
+ Ok(())
+ }
+}
+
+#[derive(Error, Clone, Debug)]
+pub enum OPFSError {
+ #[error("js opfs error: {0}")]
+ Error(String),
+}
+
+// TODO: better errors
+impl From<JsValue> for OPFSError {
+ fn from(value: JsValue) -> Self {
+ Self::Error(
+ value
+ .dyn_into::<js_sys::Error>()
+ .ok()
+ .and_then(|err| err.message().as_string())
+ .unwrap_or(String::from("<no string>")),
+ )
+ }
+}
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
index 7946241..d3033b7 100644
--- a/filamento/src/lib.rs
+++ b/filamento/src/lib.rs
@@ -16,7 +16,7 @@ use error::{
};
use files::FileStore;
use futures::FutureExt;
-use jid::JID;
+use jid::{BareJID, JID};
use lampada::{
Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage,
error::{ActorError, CommandError, ConnectionError, ReadError, WriteError},
@@ -32,6 +32,8 @@ use tokio::{
sync::{Mutex, mpsc, oneshot},
time::timeout,
};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::{debug, info};
use user::User;
use uuid::Uuid;
@@ -52,6 +54,8 @@ pub mod user;
pub enum Command<Fs: FileStore> {
/// get the roster. if offline, retreive cached version from database. should be stored in application memory
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
+ /// get the roster. if offline, retreive cached version from database. should be stored in application memory. includes user associated with each contact
+ GetRosterWithUsers(oneshot::Sender<Result<Vec<(Contact, User)>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
@@ -59,37 +63,60 @@ pub enum Command<Fs: FileStore> {
GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
// TODO: paging and filtering
GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
+ // TODO: paging and filtering, nullabillity for latest message
+ GetChatsOrderedWithLatestMessagesAndUsers(
+ oneshot::Sender<Result<Vec<((Chat, User), (Message, User))>, DatabaseError>>,
+ ),
/// get a specific chat by jid
- GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
+ GetChat(BareJID, oneshot::Sender<Result<Chat, DatabaseError>>),
+ /// get a specific chat and user by jid
+ GetChatAndUser(
+ BareJID,
+ oneshot::Sender<Result<(Chat, User), DatabaseError>>,
+ ),
+ /// get message history for chat (does appropriate mam things)
+ GetMessage(Uuid, oneshot::Sender<Result<Message, DatabaseError>>),
+ // TODO: paging and filtering
+ GetMessages(
+ BareJID,
+ oneshot::Sender<Result<Vec<Message>, DatabaseError>>,
+ ),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
- GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
+ GetMessagesWithUsers(
+ BareJID,
+ oneshot::Sender<Result<Vec<(Message, User)>, DatabaseError>>,
+ ),
/// delete a chat from your chat history, along with all the corresponding messages
- DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
+ DeleteChat(BareJID, oneshot::Sender<Result<(), DatabaseError>>),
/// delete a message from your chat history
DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
/// get a user from your users database
- GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
+ GetUser(BareJID, oneshot::Sender<Result<User, DatabaseError>>),
/// add a contact to your roster, with a status of none, no subscriptions.
- AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ AddContact(BareJID, oneshot::Sender<Result<(), RosterError>>),
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
- BuddyRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),
+ BuddyRequest(BareJID, oneshot::Sender<Result<(), SubscribeError>>),
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
- SubscriptionRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),
+ SubscriptionRequest(BareJID, oneshot::Sender<Result<(), SubscribeError>>),
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
- AcceptBuddyRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),
+ AcceptBuddyRequest(BareJID, oneshot::Sender<Result<(), SubscribeError>>),
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
- AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),
+ AcceptSubscriptionRequest(BareJID, oneshot::Sender<Result<(), SubscribeError>>),
/// unsubscribe to a contact, but don't remove their subscription.
- UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ UnsubscribeFromContact(BareJID, oneshot::Sender<Result<(), WriteError>>),
/// stop a contact from being subscribed, but stay subscribed to the contact.
- UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ UnsubscribeContact(BareJID, oneshot::Sender<Result<(), WriteError>>),
/// remove subscriptions to and from contact, but keep in roster.
- UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ UnfriendContact(BareJID, oneshot::Sender<Result<(), WriteError>>),
/// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
- DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ DeleteContact(BareJID, oneshot::Sender<Result<(), RosterError>>),
/// update contact. contact details will be overwritten with the contents of the contactupdate struct.
- UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
+ UpdateContact(
+ BareJID,
+ ContactUpdate,
+ oneshot::Sender<Result<(), RosterError>>,
+ ),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
/// send presence stanza
@@ -103,7 +130,7 @@ pub enum Command<Fs: FileStore> {
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
- SendMessage(JID, Body),
+ SendMessage(BareJID, Body),
// TODO: resend failed messages
// ResendMessage(Uuid),
/// disco info query
@@ -129,7 +156,7 @@ pub enum Command<Fs: FileStore> {
sender: oneshot::Sender<Result<(), PEPError>>,
},
GetPEPItem {
- jid: Option<JID>,
+ jid: Option<BareJID>,
node: String,
id: String,
sender: oneshot::Sender<Result<pep::Item, PEPError>>,
@@ -150,14 +177,11 @@ pub enum Command<Fs: FileStore> {
#[derive(Debug, Clone)]
pub enum UpdateMessage {
- Online(Online, Vec<Contact>),
+ Online(Online, Vec<(Contact, User)>),
Offline(Offline),
- /// received roster from jabber server (replace full app roster state with this)
- /// is this needed?
- FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace)
- RosterUpdate(Contact),
- RosterDelete(JID),
+ RosterUpdate(Contact, User),
+ RosterDelete(BareJID),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence {
from: JID,
@@ -166,20 +190,23 @@ pub enum UpdateMessage {
// TODO: receipts
// MessageDispatched(Uuid),
Message {
- to: JID,
+ // TODO: rename to chat?
+ to: BareJID,
+ from: User,
message: Message,
},
MessageDelivery {
id: Uuid,
+ chat: BareJID,
delivery: Delivery,
},
- SubscriptionRequest(jid::JID),
+ SubscriptionRequest(BareJID),
NickChanged {
- jid: JID,
+ jid: BareJID,
nick: Option<String>,
},
AvatarChanged {
- jid: JID,
+ jid: BareJID,
id: Option<String>,
},
}
@@ -240,13 +267,14 @@ impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> {
let client = Self {
sender: command_sender,
// TODO: configure timeout
- timeout: Duration::from_secs(10),
+ timeout: Duration::from_secs(20),
};
- let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send, file_store);
+ let logic = ClientLogic::new(client.clone(), jid.to_bare(), db, update_send, file_store);
let actor: CoreClient<ClientLogic<Fs>> =
CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
+
tokio::spawn(async move { actor.run().await });
(client, update_recv)
@@ -254,9 +282,17 @@ impl<Fs: FileStore + Clone + Send + Sync + 'static> Client<Fs> {
}
impl<Fs: FileStore> Client<Fs> {
- pub async fn connect(&self) -> Result<(), ActorError> {
- self.send(CoreClientCommand::Connect).await?;
- Ok(())
+ /// returns the resource
+ pub async fn connect(&self) -> Result<String, CommandError<ConnectionError>> {
+ let (send, recv) = oneshot::channel::<Result<String, ConnectionError>>();
+ self.send(CoreClientCommand::Connect(send))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
}
pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
@@ -276,6 +312,22 @@ impl<Fs: FileStore> Client<Fs> {
Ok(roster)
}
+ pub async fn get_roster_with_users(
+ &self,
+ ) -> Result<Vec<(Contact, User)>, CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetRosterWithUsers(
+ send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let roster = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(roster)
+ }
+
pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChats(send)))
@@ -316,7 +368,23 @@ impl<Fs: FileStore> Client<Fs> {
Ok(chats)
}
- pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
+ pub async fn get_chats_ordered_with_latest_messages_and_users(
+ &self,
+ ) -> Result<Vec<((Chat, User), (Message, User))>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chat(&self, jid: BareJID) -> Result<Chat, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChat(jid, send)))
.await
@@ -328,9 +396,38 @@ impl<Fs: FileStore> Client<Fs> {
Ok(chat)
}
+ pub async fn get_chat_and_user(
+ &self,
+ jid: BareJID,
+ ) -> Result<(Chat, User), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetChatAndUser(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn get_message(&self, id: Uuid) -> Result<Message, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetMessage(id, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let message = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(message)
+ }
+
pub async fn get_messages(
&self,
- jid: JID,
+ jid: BareJID,
) -> Result<Vec<Message>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetMessages(jid, send)))
@@ -343,7 +440,24 @@ impl<Fs: FileStore> Client<Fs> {
Ok(messages)
}
- pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
+ pub async fn get_messages_with_users(
+ &self,
+ jid: BareJID,
+ ) -> Result<Vec<(Message, User)>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetMessagesWithUsers(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let messages = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(messages)
+ }
+
+ pub async fn delete_chat(&self, jid: BareJID) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send)))
.await
@@ -367,7 +481,7 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
+ pub async fn get_user(&self, jid: BareJID) -> Result<User, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetUser(jid, send)))
.await
@@ -379,7 +493,7 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ pub async fn add_contact(&self, jid: BareJID) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::AddContact(jid, send)))
.await
@@ -391,7 +505,7 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {
+ pub async fn buddy_request(&self, jid: BareJID) -> Result<(), CommandError<SubscribeError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))
.await
@@ -403,7 +517,10 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {
+ pub async fn subscription_request(
+ &self,
+ jid: BareJID,
+ ) -> Result<(), CommandError<SubscribeError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::SubscriptionRequest(
jid, send,
@@ -417,7 +534,10 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {
+ pub async fn accept_buddy_request(
+ &self,
+ jid: BareJID,
+ ) -> Result<(), CommandError<SubscribeError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(
jid, send,
@@ -433,7 +553,7 @@ impl<Fs: FileStore> Client<Fs> {
pub async fn accept_subscription_request(
&self,
- jid: JID,
+ jid: BareJID,
) -> Result<(), CommandError<SubscribeError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(
@@ -448,7 +568,10 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ pub async fn unsubscribe_from_contact(
+ &self,
+ jid: BareJID,
+ ) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnsubscribeFromContact(
jid, send,
@@ -462,7 +585,7 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ pub async fn unsubscribe_contact(&self, jid: BareJID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnsubscribeContact(
jid, send,
@@ -476,7 +599,7 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ pub async fn unfriend_contact(&self, jid: BareJID) -> Result<(), CommandError<WriteError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::UnfriendContact(
jid, send,
@@ -490,7 +613,7 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ pub async fn delete_contact(&self, jid: BareJID) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteContact(
jid, send,
@@ -506,7 +629,7 @@ impl<Fs: FileStore> Client<Fs> {
pub async fn update_contact(
&self,
- jid: JID,
+ jid: BareJID,
update: ContactUpdate,
) -> Result<(), CommandError<RosterError>> {
let (send, recv) = oneshot::channel();
@@ -534,7 +657,7 @@ impl<Fs: FileStore> Client<Fs> {
Ok(result)
}
- pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), ActorError> {
+ pub async fn send_message(&self, jid: BareJID, body: Body) -> Result<(), ActorError> {
self.send(CoreClientCommand::Command(Command::SendMessage(jid, body)))
.await?;
Ok(())
@@ -613,7 +736,8 @@ impl<Fs: FileStore> Client<Fs> {
pub async fn get_pep_item(
&self,
- jid: Option<JID>,
+ // i think this is correct?, should not be able to send pep requests to a full jid.
+ jid: Option<BareJID>,
node: String,
id: String,
) -> Result<pep::Item, CommandError<PEPError>> {
@@ -668,3 +792,93 @@ impl<Fs: FileStore> From<Command<Fs>> for CoreClientCommand<Command<Fs>> {
CoreClientCommand::Command(value)
}
}
+
+#[cfg(test)]
+mod tests {
+ use wasm_bindgen_test::*;
+
+ use super::*;
+
+ wasm_bindgen_test_configure!(run_in_browser);
+
+ use crate::chat;
+ use crate::files::FilesMem;
+ use std::path::Path;
+ use tokio_with_wasm::alias as tokio;
+
+ #[wasm_bindgen_test]
+ async fn login() -> () {
+ tracing_wasm::set_as_global_default();
+ let db = Db::create_connect_and_migrate(Path::new("./filamento.db"))
+ .await
+ .unwrap();
+ let (client, mut recv) = Client::new(
+ "test@blos.sm/testing2".try_into().unwrap(),
+ "slayed".to_string(),
+ db,
+ FilesMem::new(),
+ );
+
+ tokio::spawn(async move {
+ while let Some(msg) = recv.recv().await {
+ info!("{:#?}", msg)
+ }
+ });
+
+ client.connect().await.unwrap();
+ // tokio::time::sleep(Duration::from_secs(5)).await;
+ info!("changing nick");
+ client
+ .change_nick(Some("britney".to_string()))
+ .await
+ .unwrap();
+ let mut data = include_bytes!("../files/britney_starbies.jpg");
+ client.change_avatar(Some(data.to_vec())).await.unwrap();
+ info!("sending message");
+ client
+ .send_message(
+ BareJID::from_str("cel@blos.sm").unwrap(),
+ chat::Body {
+ body: "hallo!!!".to_string(),
+ },
+ )
+ .await
+ .unwrap();
+ info!("sent message");
+ client
+ .send_message(
+ BareJID::from_str("cel@blos.sm").unwrap(),
+ chat::Body {
+ body: "hallo 2".to_string(),
+ },
+ )
+ .await
+ .unwrap();
+ // tokio::time::sleep(Duration::from_secs(15)).await;
+ // info!("sending disco query");
+ // let info = client.disco_info(None, None).await.unwrap();
+ // info!("got disco result: {:#?}", info);
+ // let items = client.disco_items(None, None).await.unwrap();
+ // info!("got disco result: {:#?}", items);
+ // let info = client
+ // .disco_info(Some("blos.sm".parse().unwrap()), None)
+ // .await
+ // .unwrap();
+ // info!("got disco result: {:#?}", info);
+ // let items = client
+ // .disco_items(Some("blos.sm".parse().unwrap()), None)
+ // .await
+ // .unwrap();
+ // info!("got disco result: {:#?}", items);
+ // let info = client
+ // .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None)
+ // .await
+ // .unwrap();
+ // info!("got disco result: {:#?}", info);
+ // let items = client
+ // .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None)
+ // .await
+ // .unwrap();
+ // info!("got disco result: {:#?}", items); let mut jid: JID = "test@blos.sm".try_into().unwrap();
+ }
+}
diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs
index 37cdad5..6e392f1 100644
--- a/filamento/src/logic/connect.rs
+++ b/filamento/src/logic/connect.rs
@@ -11,7 +11,7 @@ use crate::{
use super::ClientLogic;
-pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>(
+pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync + 'static>(
logic: ClientLogic<Fs>,
connection: Connected,
) {
@@ -19,7 +19,7 @@ pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>(
debug!("getting roster");
logic
.clone()
- .handle_online(Command::GetRoster(send), connection.clone())
+ .handle_online(Command::GetRosterWithUsers(send), connection.clone())
.await;
debug!("sent roster req");
let roster = recv.await;
diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs
index cabbef4..7f3a2e6 100644
--- a/filamento/src/logic/local_only.rs
+++ b/filamento/src/logic/local_only.rs
@@ -1,4 +1,4 @@
-use jid::JID;
+use jid::{BareJID, JID};
use uuid::Uuid;
use crate::{
@@ -28,23 +28,53 @@ pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone
Ok(logic.db().read_chats_ordered_with_latest_messages().await?)
}
+pub async fn handle_get_chats_ordered_with_latest_messages_and_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<((Chat, User), (Message, User))>, DatabaseError> {
+ Ok(logic
+ .db()
+ .read_chats_ordered_with_latest_messages_and_users()
+ .await?)
+}
+
pub async fn handle_get_chat<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
- jid: JID,
+ jid: BareJID,
) -> Result<Chat, DatabaseError> {
Ok(logic.db().read_chat(jid).await?)
}
+pub async fn handle_get_chat_and_user<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: BareJID,
+) -> Result<(Chat, User), DatabaseError> {
+ Ok(logic.db().read_chat_and_user(jid).await?)
+}
+
+pub async fn handle_get_message<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ id: Uuid,
+) -> Result<Message, DatabaseError> {
+ Ok(logic.db().read_message(id).await?)
+}
+
pub async fn handle_get_messages<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
- jid: JID,
+ jid: BareJID,
) -> Result<Vec<Message>, DatabaseError> {
Ok(logic.db().read_message_history(jid).await?)
}
+pub async fn handle_get_messages_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: BareJID,
+) -> Result<Vec<(Message, User)>, DatabaseError> {
+ Ok(logic.db().read_message_history_with_users(jid).await?)
+}
+
pub async fn handle_delete_chat<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
- jid: JID,
+ jid: BareJID,
) -> Result<(), DatabaseError> {
Ok(logic.db().delete_chat(jid).await?)
}
@@ -58,7 +88,7 @@ pub async fn handle_delete_messaage<Fs: FileStore + Clone>(
pub async fn handle_get_user<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
- jid: JID,
+ jid: BareJID,
) -> Result<User, DatabaseError> {
Ok(logic.db().read_user(jid).await?)
}
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs
index 5e05dac..ddf0343 100644
--- a/filamento/src/logic/mod.rs
+++ b/filamento/src/logic/mod.rs
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc};
-use jid::JID;
+use jid::{BareJID, JID};
use lampada::{Connected, Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
@@ -25,7 +25,7 @@ mod process_stanza;
#[derive(Clone)]
pub struct ClientLogic<Fs: FileStore> {
client: Client<Fs>,
- bare_jid: JID,
+ jid: BareJID,
db: Db,
pending: Pending,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -80,7 +80,7 @@ impl Pending {
impl<Fs: FileStore> ClientLogic<Fs> {
pub fn new(
client: Client<Fs>,
- bare_jid: JID,
+ jid: BareJID,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
file_store: Fs,
@@ -90,7 +90,7 @@ impl<Fs: FileStore> ClientLogic<Fs> {
pending: Pending::new(),
update_sender,
client,
- bare_jid,
+ jid,
file_store,
}
}
@@ -127,7 +127,7 @@ impl<Fs: FileStore> ClientLogic<Fs> {
}
}
-impl<Fs: FileStore + Clone + Send + Sync> Logic for ClientLogic<Fs> {
+impl<Fs: FileStore + Clone + Send + Sync + 'static> Logic for ClientLogic<Fs> {
type Cmd = Command<Fs>;
// pub async fn handle_stream_error(self, error) {}
diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs
index 566972c..aa84f3d 100644
--- a/filamento/src/logic/offline.rs
+++ b/filamento/src/logic/offline.rs
@@ -1,7 +1,9 @@
use std::process::id;
use chrono::Utc;
+use jid::FullJID;
use lampada::error::WriteError;
+use tracing::error;
use uuid::Uuid;
use crate::{
@@ -14,14 +16,16 @@ use crate::{
files::FileStore,
presence::Online,
roster::Contact,
+ user::User,
};
use super::{
ClientLogic,
local_only::{
- handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
- handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
+ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chat_and_user,
+ handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
+ handle_get_chats_ordered_with_latest_messages_and_users, handle_get_message,
+ handle_get_messages, handle_get_messages_with_users, handle_get_user,
},
};
@@ -47,6 +51,12 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>(
Ok(logic.db().read_cached_roster().await?)
}
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ Ok(logic.db().read_cached_roster_with_users().await?)
+}
+
pub async fn handle_offline_result<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
command: Command<Fs>,
@@ -56,6 +66,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let roster = handle_get_roster(logic).await;
sender.send(roster);
}
+ Command::GetRosterWithUsers(sender) => {
+ let roster = handle_get_roster_with_users(logic).await;
+ sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
sender.send(chats);
@@ -68,14 +82,30 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chats = handle_get_chat(logic, jid).await;
sender.send(chats);
}
+ Command::GetChatAndUser(jid, sender) => {
+ let chat = handle_get_chat_and_user(logic, jid).await;
+ let _ = sender.send(chat);
+ }
+ Command::GetMessage(id, sender) => {
+ let message = handle_get_message(logic, id).await;
+ let _ = sender.send(message);
+ }
Command::GetMessages(jid, sender) => {
let messages = handle_get_messages(logic, jid).await;
sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
sender.send(result);
@@ -128,7 +158,7 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let message = Message {
id,
- from: logic.bare_jid.clone(),
+ from: logic.jid.clone(),
// TODO: failure reason
delivery: Some(Delivery::Failed),
timestamp,
@@ -138,11 +168,15 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
// TODO: mark these as potentially failed upon client launch
if let Err(e) = logic
.db()
- .create_message_with_self_resource(
+ // TODO: can create message without a resource....
+ .create_message_with_user_resource(
message.clone(),
jid.clone(),
// TODO: when message is queued and sent, the from must also be updated with the correct resource
- logic.bare_jid.clone(),
+ FullJID {
+ bare_jid: logic.jid.clone(),
+ resourcepart: "unsent".to_string(),
+ },
)
.await
{
@@ -151,11 +185,25 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
.handle_error(MessageSendError::MessageHistory(e.into()).into())
.await;
}
+
+ let from = match logic.db().read_user(logic.jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ }
+ };
+
logic
.update_sender()
.send(crate::UpdateMessage::Message {
- to: jid.as_bare(),
+ to: jid,
message,
+ from,
})
.await;
}
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index d9441d7..b36f9a9 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -3,7 +3,7 @@ use std::{io::Cursor, time::Duration};
use base64::{prelude::BASE64_STANDARD, Engine};
use chrono::Utc;
use image::ImageReader;
-use jid::JID;
+use jid::{BareJID, JID};
use lampada::{Connected, WriteMessage, error::WriteError};
use sha1::{Digest, Sha1};
use stanza::{
@@ -11,26 +11,25 @@ use stanza::{
iq::{self, Iq, IqType, Query}, Stanza
}, xep_0030::{info, items}, xep_0060::{self, owner, pubsub::{self, Pubsub}}, xep_0084, xep_0172::{self, Nick}, xep_0203::Delay
};
-use tokio::sync::oneshot;
+use tokio::{sync::oneshot, task::spawn_blocking};
+#[cfg(target_arch = "wasm32")]
+use tokio_with_wasm::alias as tokio;
use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError
- }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
+ }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage
};
use super::{
- ClientLogic,
local_only::{
- handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
- handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
- },
+ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chat_and_user, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_message, handle_get_messages, handle_get_messages_with_users, handle_get_user
+ }, ClientLogic
};
-pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {
+pub async fn handle_online<Fs: FileStore + Clone + 'static>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {
let result = handle_online_result(&logic, command, connection).await;
match result {
Ok(_) => {}
@@ -44,7 +43,7 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>(
) -> Result<Vec<Contact>, RosterError> {
let iq_id = Uuid::new_v4().to_string();
let stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq_id.to_string(),
to: None,
r#type: IqType::Get,
@@ -97,14 +96,79 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>(
}
}
+// this can't query the client... otherwise there is a hold-up and the connection can't complete
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ connection: Connected,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone().into()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, stanza, iq_id.clone())
+ .await?;
+ // TODO: timeout
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .handle_error(Error::Roster(RosterError::Cache(e.into())))
+ .await;
+ };
+ let mut users = Vec::new();
+ for contact in &contacts {
+ let user = logic.db().read_user(contact.user_jid.clone()).await?;
+ users.push(user);
+ }
+ Ok(contacts.into_iter().zip(users).collect())
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
pub async fn handle_add_contact<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -158,9 +222,10 @@ pub async fn handle_add_contact<Fs: FileStore + Clone>(
pub async fn handle_buddy_request<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), SubscribeError> {
- let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let jid: JID = jid.into();
+ let client_user = logic.db.read_user(logic.jid.clone()).await?;
let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
to: Some(jid.clone()),
@@ -181,13 +246,13 @@ pub async fn handle_buddy_request<Fs: FileStore + Clone>(
pub async fn handle_subscription_request<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), SubscribeError> {
// TODO: i should probably have builders
- let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let client_user = logic.db.read_user(logic.jid.clone()).await?;
let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- to: Some(jid),
+ to: Some(jid.into()),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
nick,
..Default::default()
@@ -199,15 +264,16 @@ pub async fn handle_subscription_request<Fs: FileStore + Clone>(
pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), SubscribeError> {
+ let jid: JID = jid.into();
let presence = Stanza::Presence(stanza::client::presence::Presence {
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
..Default::default()
});
connection.write_handle().write(presence).await?;
- let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let client_user = logic.db.read_user(logic.jid.clone()).await?;
let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
to: Some(jid),
@@ -222,14 +288,11 @@ pub async fn handle_accept_buddy_request<Fs: FileStore + Clone>(
pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), SubscribeError> {
- let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
- let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- nick,
+ to: Some(jid.into()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
..Default::default()
});
connection.write_handle().write(presence).await?;
@@ -238,10 +301,10 @@ pub async fn handle_accept_subscription_request<Fs: FileStore + Clone>(
pub async fn handle_unsubscribe_from_contact(
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- to: Some(jid),
+ to: Some(jid.into()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
..Default::default()
});
@@ -249,9 +312,9 @@ pub async fn handle_unsubscribe_from_contact(
Ok(())
}
-pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
+pub async fn handle_unsubscribe_contact(connection: Connected, jid: BareJID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- to: Some(jid),
+ to: Some(jid.into()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
..Default::default()
});
@@ -259,7 +322,8 @@ pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Resu
Ok(())
}
-pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
+pub async fn handle_unfriend_contact(connection: Connected, jid: BareJID) -> Result<(), WriteError> {
+ let jid: JID = jid.into();
let presence = Stanza::Presence(stanza::client::presence::Presence {
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
@@ -278,11 +342,11 @@ pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<
pub async fn handle_delete_contact<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -337,7 +401,7 @@ pub async fn handle_delete_contact<Fs: FileStore + Clone>(
pub async fn handle_update_contact<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
- jid: JID,
+ jid: BareJID,
contact_update: ContactUpdate,
) -> Result<(), RosterError> {
let iq_id = Uuid::new_v4().to_string();
@@ -348,7 +412,8 @@ pub async fn handle_update_contact<Fs: FileStore + Clone>(
.map(|group| stanza::roster::Group(Some(group))),
);
let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
+ // TODO: these clones could technically be avoided?
+ from: Some(connection.jid().clone().into()),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -412,10 +477,10 @@ pub async fn handle_set_status<Fs: FileStore + Clone>(
Ok(())
}
-pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: JID, body: Body) {
+pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: BareJID, body: Body) {
// upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default.
// let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
- let have_chatted = match logic.db().upsert_chat_and_user(&jid).await {
+ let have_chatted = match logic.db().upsert_chat_and_user(jid.clone()).await {
Ok(have_chatted) => {
have_chatted
},
@@ -428,7 +493,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
let nick;
let mark_chat_as_chatted;
if have_chatted == false {
- match logic.db.read_user(logic.bare_jid.clone()).await {
+ match logic.db.read_user(logic.jid.clone()).await {
Ok(u) => {
nick = u.nick.map(|nick| Nick(nick));
mark_chat_as_chatted = true;
@@ -451,7 +516,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
let timestamp = Utc::now();
let message = Message {
id,
- from: connection.jid().as_bare(),
+ from: connection.jid().to_bare(),
body: body.clone(),
timestamp,
delivery: Some(Delivery::Sending),
@@ -461,7 +526,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
// TODO: mark these as potentially failed upon client launch
if let Err(e) = logic
.db()
- .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone())
+ .create_message_with_user_resource(message.clone(), jid.clone(), connection.jid().clone())
.await
{
// TODO: should these really be handle_error or just the error macro?
@@ -470,20 +535,33 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
.await;
}
+ let from = match logic.db().read_user(logic.jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ },
+ };
+
// tell the client a message is being sent
logic
.update_sender()
.send(UpdateMessage::Message {
- to: jid.as_bare(),
+ to: jid.clone(),
message,
+ from,
})
.await;
// prepare the message stanza
let message_stanza = Stanza::Message(stanza::client::message::Message {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: Some(id.to_string()),
- to: Some(jid.clone()),
+ to: Some(jid.clone().into()),
// TODO: specify message type
r#type: stanza::client::message::MessageType::Chat,
// TODO: lang ?
@@ -508,11 +586,15 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
match result {
Ok(_) => {
info!("sent message: {:?}", message_stanza);
+ if let Err(e) = logic.db().update_message_delivery(id, Delivery::Written).await {
+ error!("updating message delivery: {}", e);
+ }
logic
.update_sender()
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Written,
+ chat: jid.clone(),
})
.await;
if mark_chat_as_chatted {
@@ -530,6 +612,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Failed,
+ chat: jid,
})
.await;
logic.handle_error(MessageSendError::Write(e).into()).await;
@@ -559,7 +642,7 @@ pub async fn handle_disco_info<Fs: FileStore + Clone>(
) -> Result<Info, DiscoError> {
let id = Uuid::new_v4().to_string();
let request = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: id.clone(),
to: jid.clone(),
r#type: IqType::Get,
@@ -587,7 +670,7 @@ pub async fn handle_disco_info<Fs: FileStore + Clone>(
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == jid || {
if jid == None {
- from == Some(connection.jid().as_bare())
+ from == Some(connection.jid().to_bare().into())
} else {
false
}
@@ -614,7 +697,7 @@ pub async fn handle_disco_info<Fs: FileStore + Clone>(
}
} else {
Err(DiscoError::IncorrectEntity(
- from.unwrap_or_else(|| connection.jid().as_bare()),
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
))
}
}
@@ -630,7 +713,7 @@ pub async fn handle_disco_items<Fs: FileStore + Clone>(
) -> Result<Items, DiscoError> {
let id = Uuid::new_v4().to_string();
let request = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: id.clone(),
to: jid.clone(),
r#type: IqType::Get,
@@ -656,7 +739,7 @@ pub async fn handle_disco_items<Fs: FileStore + Clone>(
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == jid || {
if jid == None {
- from == Some(connection.jid().as_bare())
+ from == Some(connection.jid().to_bare().into())
} else {
false
}
@@ -683,7 +766,7 @@ pub async fn handle_disco_items<Fs: FileStore + Clone>(
}
} else {
Err(DiscoError::IncorrectEntity(
- from.unwrap_or_else(|| connection.jid().as_bare()),
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
))
}
}
@@ -748,7 +831,7 @@ pub async fn handle_publish_pep_item<Fs: FileStore + Clone>(
},
};
let request = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: id.clone(),
to: None,
r#type: IqType::Set,
@@ -770,7 +853,7 @@ pub async fn handle_publish_pep_item<Fs: FileStore + Clone>(
// TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == None ||
- from == Some(connection.jid().as_bare())
+ from == Some(connection.jid().to_bare().into())
{
match r#type {
IqType::Result => {
@@ -790,7 +873,7 @@ pub async fn handle_publish_pep_item<Fs: FileStore + Clone>(
}
} else {
Err(PEPError::IncorrectEntity(
- from.unwrap_or_else(|| connection.jid().as_bare()),
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
))
}
}
@@ -798,10 +881,11 @@ pub async fn handle_publish_pep_item<Fs: FileStore + Clone>(
}
}
-pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<JID>, node: String, id: String) -> Result<pep::Item, PEPError> {
+pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, connection: Connected, jid: Option<BareJID>, node: String, id: String) -> Result<pep::Item, PEPError> {
+ let jid = jid.map(|jid| Into::<JID>::into(jid));
let stanza_id = Uuid::new_v4().to_string();
let request = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: stanza_id.clone(),
to: jid.clone(),
r#type: IqType::Get,
@@ -829,7 +913,7 @@ pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == jid || {
if jid == None {
- from == Some(connection.jid().as_bare())
+ from == Some(connection.jid().to_bare().into())
} else {
false
}
@@ -875,7 +959,7 @@ pub async fn handle_get_pep_item<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
} else {
// TODO: include expected entity
Err(PEPError::IncorrectEntity(
- from.unwrap_or_else(|| connection.jid().as_bare()),
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
))
}
}
@@ -888,29 +972,33 @@ pub async fn handle_change_nick<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
Ok(())
}
-pub async fn handle_change_avatar<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> {
+pub async fn handle_change_avatar<Fs: FileStore + Clone + 'static>(logic: &ClientLogic<Fs>, img_data: Option<Vec<u8>>) -> Result<(), AvatarPublishError<Fs>> {
match img_data {
// set avatar
Some(data) => {
- // load the image data and guess the format
- let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?;
+ let (bytes, hash, data_png, data_b64) = spawn_blocking(move || -> Result<_, _> {
+ // load the image data and guess the format
+ let image = ImageReader::new(Cursor::new(data)).with_guessed_format()?.decode()?;
+
+ // convert the image to png;
+ let mut data_png = Vec::new();
+ let image = image.resize(192, 192, image::imageops::FilterType::Nearest);
+ image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Jpeg)?;
- // convert the image to png;
- let mut data_png = Vec::new();
- let image = image.resize(192, 192, image::imageops::FilterType::Nearest);
- image.write_to(&mut Cursor::new(&mut data_png), image::ImageFormat::Jpeg)?;
+ // calculate the length of the data in bytes.
+ let bytes = data_png.len().try_into()?;
- // calculate the length of the data in bytes.
- let bytes = data_png.len().try_into()?;
+ // calculate sha1 hash of the data
+ let mut sha1 = Sha1::new();
+ sha1.update(&data_png);
+ let sha1_result = sha1.finalize();
+ let hash = hex::encode(sha1_result);
- // calculate sha1 hash of the data
- let mut sha1 = Sha1::new();
- sha1.update(&data_png);
- let sha1_result = sha1.finalize();
- let hash = hex::encode(sha1_result);
+ // encode the image data as base64
+ let data_b64 = BASE64_STANDARD.encode(data_png.clone());
- // encode the image data as base64
- let data_b64 = BASE64_STANDARD.encode(data_png.clone());
+ Ok::<(u32, String, Vec<u8>, String), AvatarPublishError<Fs>>((bytes, hash, data_png, data_b64))
+ }).await.unwrap()?;
// publish the data to the data node
logic.client().publish(pep::Item::AvatarData(Some(avatar::Data { hash: hash.clone(), data_b64 })), "urn:xmpp:avatar:data".to_string()).await?;
@@ -944,7 +1032,7 @@ pub async fn handle_delete_pep_node<Fs: FileStore + Clone>(
) -> Result<(), PEPError> {
let id = Uuid::new_v4().to_string();
let request = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: id.clone(),
to: None,
r#type: IqType::Set,
@@ -966,7 +1054,7 @@ pub async fn handle_delete_pep_node<Fs: FileStore + Clone>(
// TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
}) if r#type == IqType::Result || r#type == IqType::Error => {
if from == None ||
- from == Some(connection.jid().as_bare())
+ from == Some(connection.jid().to_bare().into())
{
match r#type {
IqType::Result => {
@@ -987,7 +1075,7 @@ pub async fn handle_delete_pep_node<Fs: FileStore + Clone>(
}
} else {
Err(PEPError::IncorrectEntity(
- from.unwrap_or_else(|| connection.jid().as_bare()),
+ from.unwrap_or_else(|| connection.jid().to_bare().into()),
))
}
}
@@ -996,7 +1084,7 @@ pub async fn handle_delete_pep_node<Fs: FileStore + Clone>(
}
// TODO: could probably macro-ise?
-pub async fn handle_online_result<Fs: FileStore + Clone>(
+pub async fn handle_online_result<Fs: FileStore + Clone + 'static>(
logic: &ClientLogic<Fs>,
command: Command<Fs>,
connection: Connected,
@@ -1006,6 +1094,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let roster = handle_get_roster(logic, connection).await;
let _ = result_sender.send(roster);
}
+ Command::GetRosterWithUsers(result_sender) => {
+ let roster = handle_get_roster_with_users(logic, connection).await;
+ let _ = result_sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
let _ = sender.send(chats);
@@ -1018,14 +1110,30 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
let _ = sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chat = handle_get_chat(logic, jid).await;
let _ = sender.send(chat);
}
+ Command::GetChatAndUser(jid, sender) => {
+ let chat = handle_get_chat_and_user(logic, jid).await;
+ let _ = sender.send(chat);
+ }
+ Command::GetMessage(id, sender) => {
+ let message = handle_get_message(logic, id).await;
+ let _ = sender.send(message);
+ }
Command::GetMessages(jid, sender) => {
let messages = handle_get_messages(logic, jid).await;
let _ = sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
let _ = sender.send(result);
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 9c49b04..67b0d3f 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -20,12 +20,13 @@ use crate::{
UpdateMessage, caps,
chat::{Body, Message},
error::{
- AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
- RosterError,
+ AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError,
+ PresenceError, RosterError,
},
files::FileStore,
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
+ user::User,
};
use super::ClientLogic;
@@ -69,37 +70,47 @@ pub async fn recv_message<Fs: FileStore + Clone>(
// TODO: proper id xep
.map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
.unwrap_or_else(|| Uuid::new_v4()),
- from: from.as_bare(),
+ from: from.to_bare(),
timestamp,
body: Body {
body: body.body.unwrap_or_default(),
},
delivery: None,
};
+ // TODO: process message type="error"
// save the message to the database
- match logic.db().upsert_chat_and_user(&from).await {
- Ok(_) => {
- if let Err(e) = logic
- .db()
- .create_message_with_user_resource(
- message.clone(),
- from.clone(),
- from.clone(),
- )
- .await
- {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
- .await;
- error!("failed to upsert chat and user")
+ match logic.db().upsert_chat_and_user(from.to_bare()).await {
+ Ok(_) => match from.as_full() {
+ Ok(from) => {
+ if let Err(e) = logic
+ .db()
+ .create_message_with_user_resource(
+ message.clone(),
+ from.to_bare(),
+ from.clone(),
+ )
+ .await
+ {
+ error!("failed to create message: {}", e);
+ }
}
+ Err(e) => error!("failed to create message: {}", e),
+ },
+ Err(e) => {
+ error!("failed to upsert chat and user: {}", e);
}
+ };
+
+ let from_user = match logic.db().read_user(from.to_bare()).await {
+ Ok(u) => u,
Err(e) => {
- logic
- .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
- .await;
- error!("failed to upsert chat and user")
+ error!("{}", e);
+ User {
+ jid: from.to_bare(),
+ nick: None,
+ avatar: None,
+ }
}
};
@@ -107,7 +118,8 @@ pub async fn recv_message<Fs: FileStore + Clone>(
logic
.update_sender()
.send(UpdateMessage::Message {
- to: from.as_bare(),
+ to: from.to_bare(),
+ from: from_user,
message,
})
.await;
@@ -116,13 +128,13 @@ pub async fn recv_message<Fs: FileStore + Clone>(
if let Some(nick) = stanza_message.nick {
let nick = nick.0;
if nick.is_empty() {
- match logic.db().delete_user_nick(from.as_bare()).await {
+ match logic.db().delete_user_nick(from.to_bare()).await {
Ok(changed) => {
if changed {
logic
.update_sender()
.send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
nick: None,
})
.await;
@@ -136,7 +148,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
logic
.update_sender()
.send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
nick: None,
})
.await;
@@ -145,7 +157,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
} else {
match logic
.db()
- .upsert_user_nick(from.as_bare(), nick.clone())
+ .upsert_user_nick(from.to_bare(), nick.clone())
.await
{
Ok(changed) => {
@@ -153,7 +165,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
logic
.update_sender()
.send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
nick: Some(nick),
})
.await;
@@ -167,7 +179,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
logic
.update_sender()
.send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
nick: Some(nick),
})
.await;
@@ -190,7 +202,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
if nick.is_empty() {
match logic
.db()
- .delete_user_nick(from.as_bare())
+ .delete_user_nick(from.to_bare())
.await
{
Ok(changed) => {
@@ -198,7 +210,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
logic
.update_sender()
.send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
nick: None,
})
.await;
@@ -214,7 +226,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
logic
.update_sender()
.send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
nick: None,
})
.await;
@@ -224,7 +236,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
match logic
.db()
.upsert_user_nick(
- from.as_bare(),
+ from.to_bare(),
nick.clone(),
)
.await
@@ -234,7 +246,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
logic
.update_sender()
.send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
nick: Some(nick),
})
.await;
@@ -250,7 +262,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
logic
.update_sender()
.send(UpdateMessage::NickChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
nick: Some(nick),
})
.await;
@@ -285,7 +297,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
match logic
.db()
.upsert_user_avatar(
- from.as_bare(),
+ from.to_bare(),
metadata.id.clone(),
)
.await
@@ -314,7 +326,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
}) {
Ok(false) => {
// get data
- let pep_item = logic.client().get_pep_item(Some(from.as_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?;
+ let pep_item = logic.client().get_pep_item(Some(from.to_bare()), "urn:xmpp:avatar:data".to_string(), metadata.id.clone()).await.map_err(|err| Into::<AvatarUpdateError<Fs>>::into(err))?;
match pep_item {
crate::pep::Item::AvatarData(data) => {
let data = data.map(|data| data.data_b64).unwrap_or_default().replace("\n", "");
@@ -335,7 +347,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
.update_sender()
.send(
UpdateMessage::AvatarChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
id: Some(
metadata.id.clone(),
),
@@ -362,7 +374,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
.update_sender()
.send(
UpdateMessage::AvatarChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
id: Some(
metadata.id.clone(),
),
@@ -392,7 +404,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
// delete avatar
match logic
.db()
- .delete_user_avatar(from.as_bare())
+ .delete_user_avatar(from.to_bare())
.await
{
Ok((changed, old_avatar)) => {
@@ -410,7 +422,7 @@ pub async fn recv_message<Fs: FileStore + Clone>(
.update_sender()
.send(
UpdateMessage::AvatarChanged {
- jid: from.as_bare(),
+ jid: from.to_bare(),
id: None,
},
)
@@ -470,13 +482,7 @@ pub async fn recv_presence(
stanza::client::presence::PresenceType::Error => {
// TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
// TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
- Err(PresenceError::StanzaError(
- presence
- .errors
- .first()
- .cloned()
- .expect("error MUST have error"),
- ))
+ Err(PresenceError::StanzaError(presence.errors.first().cloned()))
}
// should not happen (error to server)
stanza::client::presence::PresenceType::Probe => {
@@ -485,6 +491,7 @@ pub async fn recv_presence(
}
stanza::client::presence::PresenceType::Subscribe => {
// may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ let from = from.try_into()?;
Ok(Some(UpdateMessage::SubscriptionRequest(from)))
}
stanza::client::presence::PresenceType::Unavailable => {
@@ -541,11 +548,12 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
logic: ClientLogic<Fs>,
connection: Connected,
iq: Iq,
-) -> Result<Option<UpdateMessage>, IqError> {
+) -> Result<Option<UpdateMessage>, IqProcessError> {
if let Some(to) = &iq.to {
- if *to == *connection.jid() {
+ // TODO: this clone could mayb b avoided
+ if *to == connection.jid().clone().into() {
} else {
- return Err(IqError::IncorrectAddressee(to.clone()));
+ return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone())));
}
}
match iq.r#type {
@@ -553,17 +561,24 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
let from = iq
.from
.clone()
- .unwrap_or_else(|| connection.server().clone());
+ // TODO: maybe actually store the server in the connection again LOL
+ // .unwrap_or_else(|| connection.server().clone());
+ .unwrap_or_else(|| connection.jid().domainpart.parse().unwrap());
let id = iq.id.clone();
debug!("received iq result with id `{}` from {}", id, from);
- logic.pending().respond(Stanza::Iq(iq), id).await?;
+ logic
+ .pending()
+ .respond(Stanza::Iq(iq), id)
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
stanza::client::iq::IqType::Get => {
let from = iq
.from
.clone()
- .unwrap_or_else(|| connection.server().clone());
+ // .unwrap_or_else(|| connection.server().clone());
+ .unwrap_or_else(|| connection.jid().domainpart.parse().unwrap());
if let Some(query) = iq.query {
match query {
stanza::client::iq::Query::DiscoInfo(query) => {
@@ -577,7 +592,7 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
} else {
match logic
.db()
- .read_capabilities(&query.node.clone().unwrap())
+ .read_capabilities(query.node.clone().unwrap())
.await
{
Ok(c) => match caps::decode_info_base64(c) {
@@ -587,7 +602,7 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
}
Err(_e) => {
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
@@ -596,14 +611,18 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
},
Err(_e) => {
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
@@ -612,14 +631,18 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
}
};
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Result,
@@ -627,14 +650,18 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: Some(iq::Query::DiscoInfo(disco)),
errors: vec![],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
Ok(None)
}
_ => {
warn!("received unsupported iq get from {}", from);
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
@@ -642,7 +669,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq get from {}", from);
Ok(None)
} // stanza::client::iq::Query::Bind(bind) => todo!(),
@@ -654,7 +685,7 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
} else {
info!("received malformed iq query from {}", from);
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
@@ -662,7 +693,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::BadRequest.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to malformed iq query from {}", from);
Ok(None)
}
@@ -671,7 +706,8 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
let from = iq
.from
.clone()
- .unwrap_or_else(|| connection.server().clone());
+ // .unwrap_or_else(|| connection.server().clone());
+ .unwrap_or_else(|| connection.jid().domainpart.parse().unwrap());
if let Some(query) = iq.query {
match query {
stanza::client::iq::Query::Roster(mut query) => {
@@ -698,7 +734,7 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
.await;
}
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Result,
@@ -713,13 +749,18 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
.handle_error(RosterError::PushReply(e.into()).into())
.await;
}
- Ok(Some(UpdateMessage::RosterUpdate(contact)))
+ let user = logic
+ .db()
+ .read_user(contact.user_jid.clone())
+ .await
+ .map_err(|e| Into::<RosterError>::into(e))?;
+ Ok(Some(UpdateMessage::RosterUpdate(contact, user)))
}
}
} else {
warn!("received malformed roster push");
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
@@ -727,7 +768,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
@@ -735,7 +780,7 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
_ => {
warn!("received unsupported iq set from {}", from);
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
@@ -743,7 +788,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq set from {}", from);
Ok(None)
}
@@ -751,7 +800,7 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
} else {
warn!("received malformed iq set from {}", from);
let iq = Iq {
- from: Some(connection.jid().clone()),
+ from: Some(connection.jid().clone().into()),
id: iq.id,
to: iq.from,
r#type: IqType::Error,
@@ -759,7 +808,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
@@ -776,7 +829,7 @@ pub async fn process_stanza<Fs: FileStore + Clone>(
Stanza::Presence(presence) => Ok(recv_presence(presence).await?),
Stanza::Iq(iq) => Ok(recv_iq(logic, connection.clone(), iq).await?),
// unreachable, always caught by lampada
- // TODO: make cleaner than this in some way
+ // TODO: make cleaner than this in some way, by just reexporting a stanza enum from lampada ig.
Stanza::Error(error) => {
unreachable!()
}
diff --git a/filamento/src/presence.rs b/filamento/src/presence.rs
index a7a8965..de4dd7c 100644
--- a/filamento/src/presence.rs
+++ b/filamento/src/presence.rs
@@ -1,19 +1,20 @@
use chrono::{DateTime, Utc};
-use sqlx::Sqlite;
+use rusqlite::{
+ ToSql,
+ types::{FromSql, ToSqlOutput, Value},
+};
use stanza::{client::presence::String1024, xep_0203::Delay};
use crate::caps;
-#[derive(Debug, Default, sqlx::FromRow, Clone)]
+#[derive(Debug, Default, Clone)]
pub struct Online {
pub show: Option<Show>,
- #[sqlx(rename = "message")]
pub status: Option<String>,
- #[sqlx(skip)]
pub priority: Option<i8>,
}
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Show {
Away,
Chat,
@@ -21,39 +22,27 @@ pub enum Show {
ExtendedAway,
}
-impl sqlx::Type<Sqlite> for Show {
- fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
- <&str as sqlx::Type<Sqlite>>::type_info()
- }
-}
-
-impl sqlx::Decode<'_, Sqlite> for Show {
- fn decode(
- value: <Sqlite as sqlx::Database>::ValueRef<'_>,
- ) -> Result<Self, sqlx::error::BoxDynError> {
- let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
- match value {
- "away" => Ok(Self::Away),
- "chat" => Ok(Self::Chat),
- "do-not-disturb" => Ok(Self::DoNotDisturb),
- "extended-away" => Ok(Self::ExtendedAway),
- _ => unreachable!(),
- }
+impl ToSql for Show {
+ fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
+ Ok(match self {
+ Show::Away => ToSqlOutput::Owned(Value::Text("away".to_string())),
+ Show::Chat => ToSqlOutput::Owned(Value::Text("chat".to_string())),
+ Show::DoNotDisturb => ToSqlOutput::Owned(Value::Text("do-not-disturb".to_string())),
+ Show::ExtendedAway => ToSqlOutput::Owned(Value::Text("extended-away".to_string())),
+ })
}
}
-impl sqlx::Encode<'_, Sqlite> for Show {
- fn encode_by_ref(
- &self,
- buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
- ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
- let value = match self {
- Show::Away => "away",
- Show::Chat => "chat",
- Show::DoNotDisturb => "do-not-disturb",
- Show::ExtendedAway => "extended-away",
- };
- <&str as sqlx::Encode<Sqlite>>::encode(value, buf)
+impl FromSql for Show {
+ fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
+ Ok(match value.as_str()? {
+ "away" => Self::Away,
+ "chat" => Self::Chat,
+ "do-not-disturb" => Self::DoNotDisturb,
+ "extended-away" => Self::ExtendedAway,
+ // TODO: horrible
+ value => panic!("unexpected {value}"),
+ })
}
}
diff --git a/filamento/src/roster.rs b/filamento/src/roster.rs
index 43c32f5..0498278 100644
--- a/filamento/src/roster.rs
+++ b/filamento/src/roster.rs
@@ -1,85 +1,117 @@
-use std::collections::HashSet;
+use std::{collections::HashSet, fmt::Display};
-use jid::JID;
-use sqlx::Sqlite;
+use jid::BareJID;
+use rusqlite::{
+ ToSql,
+ types::{FromSql, ToSqlOutput, Value},
+};
pub struct ContactUpdate {
pub name: Option<String>,
pub groups: HashSet<String>,
}
-#[derive(Debug, sqlx::FromRow, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "reactive_stores", derive(reactive_stores::Store))]
pub struct Contact {
// jid is the id used to reference everything, but not the primary key
- pub user_jid: JID,
+ pub user_jid: BareJID,
pub subscription: Subscription,
/// client user defined name
pub name: Option<String>,
// TODO: avatar, nickname
/// nickname picked by contact
// nickname: Option<String>,
- #[sqlx(skip)]
+ #[cfg_attr(feature = "reactive_stores", store(key: String = |group| group.clone()))]
pub groups: HashSet<String>,
}
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+/// Contact subscription state.
pub enum Subscription {
+ /// No subscriptions.
None,
+ /// Pending outgoing subscription request.
PendingOut,
+ /// Pending incoming subscription request.
PendingIn,
+ /// Pending incoming & pending outgoing subscription requests.
PendingInPendingOut,
+ /// Subscribed to.
OnlyOut,
+ /// Subscription from.
OnlyIn,
+ /// Subscribed to & pending incoming subscription request.
OutPendingIn,
+ /// Subscription from & pending outgoing subscription request.
InPendingOut,
+ /// Buddy (subscriptions both ways).
Buddy,
// TODO: perhaps don't need, just emit event to remove contact
// Remove,
}
-impl sqlx::Type<Sqlite> for Subscription {
- fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
- <&str as sqlx::Type<Sqlite>>::type_info()
+impl ToSql for Subscription {
+ fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
+ Ok(match self {
+ Subscription::None => ToSqlOutput::Owned(Value::Text("none".to_string())),
+ Subscription::PendingOut => ToSqlOutput::Owned(Value::Text("pending-out".to_string())),
+ Subscription::PendingIn => ToSqlOutput::Owned(Value::Text("pending-in".to_string())),
+ Subscription::PendingInPendingOut => {
+ ToSqlOutput::Owned(Value::Text("pending-in-pending-out".to_string()))
+ }
+ Subscription::OnlyOut => ToSqlOutput::Owned(Value::Text("only-out".to_string())),
+ Subscription::OnlyIn => ToSqlOutput::Owned(Value::Text("only-in".to_string())),
+ Subscription::OutPendingIn => {
+ ToSqlOutput::Owned(Value::Text("out-pending-in".to_string()))
+ }
+ Subscription::InPendingOut => {
+ ToSqlOutput::Owned(Value::Text("in-pending-out".to_string()))
+ }
+ Subscription::Buddy => ToSqlOutput::Owned(Value::Text("buddy".to_string())),
+ })
}
}
-impl sqlx::Decode<'_, Sqlite> for Subscription {
- fn decode(
- value: <Sqlite as sqlx::Database>::ValueRef<'_>,
- ) -> Result<Self, sqlx::error::BoxDynError> {
- let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
- match value {
- "none" => Ok(Self::None),
- "pending-out" => Ok(Self::PendingOut),
- "pending-in" => Ok(Self::PendingIn),
- "pending-in-pending-out" => Ok(Self::PendingInPendingOut),
- "only-out" => Ok(Self::OnlyOut),
- "only-in" => Ok(Self::OnlyIn),
- "out-pending-in" => Ok(Self::OutPendingIn),
- "in-pending-out" => Ok(Self::InPendingOut),
- "buddy" => Ok(Self::Buddy),
- _ => panic!("unexpected subscription `{value}`"),
- }
+impl FromSql for Subscription {
+ fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
+ Ok(match value.as_str()? {
+ "none" => Self::None,
+ "pending-out" => Self::PendingOut,
+ "pending-in" => Self::PendingIn,
+ "pending-in-pending-out" => Self::PendingInPendingOut,
+ "only-out" => Self::OnlyOut,
+ "only-in" => Self::OnlyIn,
+ "out-pending-in" => Self::OutPendingIn,
+ "in-pending-out" => Self::InPendingOut,
+ "buddy" => Self::Buddy,
+ // TODO: don't have these lol
+ value => panic!("unexpected subscription `{value}`"),
+ })
}
}
-impl sqlx::Encode<'_, Sqlite> for Subscription {
- fn encode_by_ref(
- &self,
- buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
- ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
- let value = match self {
- Subscription::None => "none",
- Subscription::PendingOut => "pending-out",
- Subscription::PendingIn => "pending-in",
- Subscription::PendingInPendingOut => "pending-in-pending-out",
- Subscription::OnlyOut => "only-out",
- Subscription::OnlyIn => "only-in",
- Subscription::OutPendingIn => "out-pending-in",
- Subscription::InPendingOut => "in-pending-out",
- Subscription::Buddy => "buddy",
- };
- <&str as sqlx::Encode<Sqlite>>::encode(value, buf)
+impl Display for Subscription {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Subscription::None => write!(f, "No Subscriptions"),
+ Subscription::PendingOut => write!(f, "Pending Outgoing Subscription Request"),
+ Subscription::PendingIn => write!(f, "Pending Incoming Subscription Request"),
+ Subscription::PendingInPendingOut => write!(
+ f,
+ "Pending Incoming & Pending Outgoing Subscription Requests"
+ ),
+ Subscription::OnlyOut => write!(f, "Subscribed To"),
+ Subscription::OnlyIn => write!(f, "Subscription From"),
+ Subscription::OutPendingIn => {
+ write!(f, "Subscribed To & Pending Incoming Subscription Request")
+ }
+ Subscription::InPendingOut => write!(
+ f,
+ "Subscription From & Pending Outgoing Subscription Request"
+ ),
+ Subscription::Buddy => write!(f, "Buddy (Subscriptions Both Ways)"),
+ }
}
}
diff --git a/filamento/src/user.rs b/filamento/src/user.rs
index 3d5dcb4..f962a4c 100644
--- a/filamento/src/user.rs
+++ b/filamento/src/user.rs
@@ -1,9 +1,11 @@
-use jid::JID;
+use jid::BareJID;
-#[derive(Debug, sqlx::FromRow, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
+#[cfg_attr(feature = "reactive_stores", derive(reactive_stores::Store))]
pub struct User {
- pub jid: JID,
+ pub jid: BareJID,
pub nick: Option<String>,
pub avatar: Option<String>,
- pub cached_status_message: Option<String>,
+ // pub cached_status_message: Option<String>,
}