aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-02-24 08:41:58 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-02-24 08:41:58 +0000
commit66e37108cd61750aeaaa6d521742c1fb494b1394 (patch)
treeea13b8343bd8254af0d6f5f6fdb70c08539dd7a6
parent2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98 (diff)
downloadluz-66e37108cd61750aeaaa6d521742c1fb494b1394.tar.gz
luz-66e37108cd61750aeaaa6d521742c1fb494b1394.tar.bz2
luz-66e37108cd61750aeaaa6d521742c1fb494b1394.zip
fix bugs
-rw-r--r--luz/migrations/20240113011930_luz.sql33
-rw-r--r--luz/src/chat.rs2
-rw-r--r--luz/src/connection/mod.rs4
-rw-r--r--luz/src/connection/read.rs3
-rw-r--r--luz/src/db/mod.rs27
-rw-r--r--luz/src/error.rs19
-rw-r--r--luz/src/lib.rs32
-rw-r--r--luz/src/presence.rs5
-rw-r--r--luz/src/roster.rs2
9 files changed, 104 insertions, 23 deletions
diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql
index 028ae24..7b33dd3 100644
--- a/luz/migrations/20240113011930_luz.sql
+++ b/luz/migrations/20240113011930_luz.sql
@@ -3,12 +3,37 @@ PRAGMA foreign_keys = on;
-- a user jid will never change, only a chat user will change
-- TODO: avatar, nick, etc.
create table users(
+ -- TODO: enforce bare jid
jid text primary key not null,
-- can receive presence status from non-contacts
cached_status_message text
-- TODO: last_seen
);
+-- -- links to messages, jabber users, stores jid history, etc.
+-- create table identities(
+-- id text primary key not null
+-- );
+
+-- create table identities_users(
+-- id text not null,
+-- jid text not null,
+-- -- whichever has the newest timestamp is the active one.
+-- -- what to do when somebody moves, but then the old jid is used again without having explicitly moved back? create new identity to assign ownership to?
+-- -- merging of identities?
+-- activated_timestamp not null,
+-- foreign key(id) references identities(id),
+-- foreign key(jid) references users(jid),
+-- primary key(activated timestamp, id, jid)
+-- );
+
+create table resources(
+ bare_jid text not null,
+ resource text not null,
+ foreign key(bare_jid) references users(jid),
+ primary key(bare_jid, resource)
+);
+
-- enum for subscription state
create table subscription(
state text primary key not null
@@ -61,15 +86,19 @@ create table messages (
-- TODO: icky
-- the user to show it coming from (not necessarily the original sender)
+ -- from_identity text not null,
+ -- original sender details (only from jabber supported for now)
from_jid text not null,
- originally_from text not null,
+ -- resource can be null
+ from_resource text,
-- check (from_jid != original_sender),
-- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user)
-- TODO: read bool not null,
foreign key(chat_id) references chats(id) on delete cascade,
+ -- foreign key(from_identity) references identities(id),
foreign key(from_jid) references users(jid),
- foreign key(originally_from) references users(jid)
+ foreign key(from_jid, from_resource) references resources(bare_jid, resource)
);
-- enum for subscription state
diff --git a/luz/src/chat.rs b/luz/src/chat.rs
index 4fb8579..7bb99e1 100644
--- a/luz/src/chat.rs
+++ b/luz/src/chat.rs
@@ -28,7 +28,7 @@ pub struct Body {
#[derive(sqlx::FromRow)]
pub struct Chat {
- correspondent: JID,
+ pub correspondent: JID,
// message history is not stored in chat, retreived separately.
// pub message_history: Vec<Message>,
}
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs
index fda2b90..95aae1a 100644
--- a/luz/src/connection/mod.rs
+++ b/luz/src/connection/mod.rs
@@ -15,6 +15,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
+use tracing::info;
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
use crate::{
@@ -104,8 +105,10 @@ impl Supervisor {
Some(msg) = self.connection_commands.recv() => {
match msg {
SupervisorCommand::Disconnect => {
+ info!("disconnecting");
let _ = self.writer_handle.send(WriteControl::Disconnect).await;
let _ = self.reader_handle.send(ReadControl::Disconnect).await;
+ info!("sent disconnect command");
tokio::select! {
_ = async { tokio::join!(
async { let _ = (&mut self.writer_handle.handle).await; },
@@ -116,6 +119,7 @@ impl Supervisor {
(&mut self.writer_handle.handle).abort();
}
}
+ info!("disconnected");
break;
},
SupervisorCommand::Reconnect(state) => {
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index 46f1dc9..4390e00 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -202,9 +202,10 @@ async fn handle_stanza(
};
// TODO: can this be more efficient?
let result = db
- .create_message_with_user_and_chat(message.clone(), from.clone())
+ .create_message_with_user_resource_and_chat(message.clone(), from.clone())
.await;
if let Err(e) = result {
+ tracing::error!("messagecreate");
let _ = update_sender
.send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
.await;
diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs
index 4202163..3a1d73d 100644
--- a/luz/src/db/mod.rs
+++ b/luz/src/db/mod.rs
@@ -213,14 +213,14 @@ impl Db {
pub async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {
sqlx::query!("delete from roster").execute(&self.db).await?;
for contact in roster {
- self.create_contact(contact).await?;
+ self.upsert_contact(contact).await?;
}
Ok(())
}
pub async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
let mut roster: Vec<Contact> =
- sqlx::query_as("select * from roster full outer join users on jid = user_jid")
+ sqlx::query_as("select * from roster join users on jid = user_jid")
.fetch_all(&self.db)
.await?;
for contact in &mut roster {
@@ -303,6 +303,7 @@ impl Db {
struct Row {
id: Uuid,
}
+ let chat = chat.as_bare();
let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?")
.bind(chat)
.fetch_one(&self.db)
@@ -327,19 +328,24 @@ impl Db {
/// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
pub async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {
// TODO: one query
+ let bare_jid = message.from.as_bare();
+ let resource = message.from.resourcepart;
let chat_id = self.read_chat_id(chat).await?;
- sqlx::query!("insert into messages (id, body, chat_id, from_jid, originally_from) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, message.from, message.from).execute(&self.db).await?;
+ sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource).execute(&self.db).await?;
Ok(())
}
- pub async fn create_message_with_user_and_chat(
+ // create direct message
+ pub async fn create_message_with_user_resource_and_chat(
&self,
message: Message,
chat: JID,
) -> Result<(), Error> {
+ let bare_chat = chat.as_bare();
+ let resource = &chat.resourcepart;
sqlx::query!(
"insert into users (jid) values (?) on conflict do nothing",
- chat
+ bare_chat
)
.execute(&self.db)
.await?;
@@ -347,10 +353,19 @@ impl Db {
sqlx::query!(
"insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
id,
- chat
+ bare_chat
)
.execute(&self.db)
.await?;
+ if let Some(resource) = resource {
+ sqlx::query!(
+ "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
+ bare_chat,
+ resource
+ )
+ .execute(&self.db)
+ .await?;
+ }
self.create_message(message, chat).await?;
Ok(())
}
diff --git a/luz/src/error.rs b/luz/src/error.rs
index 4fdce79..f0b956e 100644
--- a/luz/src/error.rs
+++ b/luz/src/error.rs
@@ -18,6 +18,14 @@ pub enum Error {
CacheUpdate(Reason),
UnrecognizedContent(peanuts::element::Content),
Iq(IqError),
+ Cloned,
+}
+
+// TODO: this is horrifying, maybe just use tracing to forward error events???
+impl Clone for Error {
+ fn clone(&self) -> Self {
+ Error::Cloned
+ }
}
#[derive(Debug)]
@@ -37,7 +45,7 @@ pub enum RecvMessageError {
MissingFrom,
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum ConnectionError {
ConnectionFailed(Reason),
RosterRetreival(Reason),
@@ -45,6 +53,7 @@ pub enum ConnectionError {
NoCachedStatus(Reason),
}
+#[derive(Debug)]
pub struct RosterError(pub Reason);
impl From<RosterError> for Error {
@@ -88,6 +97,14 @@ pub enum Reason {
UnexpectedStanza(Stanza),
Disconnected,
ChannelSend,
+ Cloned,
+}
+
+// TODO: same here
+impl Clone for Reason {
+ fn clone(&self) -> Self {
+ Reason::Cloned
+ }
}
impl From<oneshot::error::RecvError> for Reason {
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 901553b..4c95ab6 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -32,9 +32,9 @@ pub mod chat;
mod connection;
mod db;
mod error;
-mod presence;
-mod roster;
-mod user;
+pub mod presence;
+pub mod roster;
+pub mod user;
pub struct Luz {
command_sender: mpsc::Sender<CommandMessage>,
@@ -82,7 +82,9 @@ impl Luz {
loop {
let msg = tokio::select! {
// this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
+ // THIS IS NOT OKAY LOLLLL
_ = &mut self.connection_supervisor_shutdown => {
+ info!("got this");
*self.connected.lock().await = None;
continue;
}
@@ -247,11 +249,13 @@ impl Luz {
// TODO: send unavailable presence
if let Some((_write_handle, supervisor_handle)) = c.take() {
let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
+ c = None;
} else {
unreachable!()
};
}
}
+ info!("lock released")
}
_ => {
match self.connected.lock().await.as_ref() {
@@ -962,10 +966,19 @@ impl CommandMessage {
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
// #[derive(Clone)]
+#[derive(Debug)]
pub struct LuzHandle {
sender: mpsc::Sender<CommandMessage>,
}
+impl Clone for LuzHandle {
+ fn clone(&self) -> Self {
+ Self {
+ sender: self.sender.clone(),
+ }
+ }
+}
+
impl Deref for LuzHandle {
type Target = mpsc::Sender<CommandMessage>;
@@ -981,11 +994,12 @@ impl DerefMut for LuzHandle {
}
impl LuzHandle {
- pub fn new(
+ pub async fn new(
jid: JID,
password: String,
- db: SqlitePool,
- ) -> (Self, mpsc::Receiver<UpdateMessage>) {
+ db: &str,
+ ) -> Result<(Self, mpsc::Receiver<UpdateMessage>), Reason> {
+ let db = SqlitePool::connect(db).await?;
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
@@ -1003,12 +1017,12 @@ impl LuzHandle {
);
tokio::spawn(async move { actor.run().await });
- (
+ Ok((
Self {
sender: command_sender,
},
update_receiver,
- )
+ ))
}
}
@@ -1064,7 +1078,7 @@ pub enum CommandMessage {
SendMessage(JID, Body, oneshot::Sender<Result<(), Reason>>),
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum UpdateMessage {
Error(Error),
Online(Online, Vec<Contact>),
diff --git a/luz/src/presence.rs b/luz/src/presence.rs
index 1df20a7..40d79c5 100644
--- a/luz/src/presence.rs
+++ b/luz/src/presence.rs
@@ -4,6 +4,7 @@ use stanza::client::presence::String1024;
#[derive(Debug, Default, sqlx::FromRow, Clone)]
pub struct Online {
pub show: Option<Show>,
+ #[sqlx(rename = "message")]
pub status: Option<String>,
#[sqlx(skip)]
pub priority: Option<i8>,
@@ -53,12 +54,12 @@ impl sqlx::Encode<'_, Sqlite> for Show {
}
}
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone)]
pub struct Offline {
pub status: Option<String>,
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub enum Presence {
Online(Online),
Offline(Offline),
diff --git a/luz/src/roster.rs b/luz/src/roster.rs
index 0e43a8a..43c32f5 100644
--- a/luz/src/roster.rs
+++ b/luz/src/roster.rs
@@ -58,7 +58,7 @@ impl sqlx::Decode<'_, Sqlite> for Subscription {
"out-pending-in" => Ok(Self::OutPendingIn),
"in-pending-out" => Ok(Self::InPendingOut),
"buddy" => Ok(Self::Buddy),
- _ => unreachable!(),
+ _ => panic!("unexpected subscription `{value}`"),
}
}
}