diff options
| -rw-r--r-- | luz/migrations/20240113011930_luz.sql | 33 | ||||
| -rw-r--r-- | luz/src/chat.rs | 2 | ||||
| -rw-r--r-- | luz/src/connection/mod.rs | 4 | ||||
| -rw-r--r-- | luz/src/connection/read.rs | 3 | ||||
| -rw-r--r-- | luz/src/db/mod.rs | 27 | ||||
| -rw-r--r-- | luz/src/error.rs | 19 | ||||
| -rw-r--r-- | luz/src/lib.rs | 32 | ||||
| -rw-r--r-- | luz/src/presence.rs | 5 | ||||
| -rw-r--r-- | luz/src/roster.rs | 2 | 
9 files changed, 104 insertions, 23 deletions
| diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql index 028ae24..7b33dd3 100644 --- a/luz/migrations/20240113011930_luz.sql +++ b/luz/migrations/20240113011930_luz.sql @@ -3,12 +3,37 @@ PRAGMA foreign_keys = on;  -- a user jid will never change, only a chat user will change  -- TODO: avatar, nick, etc.  create table users( +    -- TODO: enforce bare jid      jid text primary key not null,      -- can receive presence status from non-contacts      cached_status_message text      -- TODO: last_seen  ); +-- -- links to messages, jabber users, stores jid history, etc. +-- create table identities( +--     id text primary key not null +-- ); + +-- create table identities_users( +--     id text not null, +--     jid text not null, +--     -- whichever has the newest timestamp is the active one. +--     -- what to do when somebody moves, but then the old jid is used again without having explicitly moved back? create new identity to assign ownership to? +--     -- merging of identities? +--     activated_timestamp not null, +--     foreign key(id) references identities(id), +--     foreign key(jid) references users(jid), +--     primary key(activated timestamp, id, jid) +-- ); + +create table resources( +    bare_jid text not null, +    resource text not null, +    foreign key(bare_jid) references users(jid), +    primary key(bare_jid, resource) +); +  -- enum for subscription state  create table subscription(      state text primary key not null @@ -61,15 +86,19 @@ create table messages (      -- TODO: icky      -- the user to show it coming from (not necessarily the original sender) +    -- from_identity text not null, +    -- original sender details (only from jabber supported for now)      from_jid text not null, -    originally_from text not null, +    -- resource can be null +    from_resource text,      -- check (from_jid != original_sender),      -- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user)      -- TODO: read bool not null,      foreign key(chat_id) references chats(id) on delete cascade, +    -- foreign key(from_identity) references identities(id),      foreign key(from_jid) references users(jid), -    foreign key(originally_from) references users(jid) +    foreign key(from_jid, from_resource) references resources(bare_jid, resource)  );  -- enum for subscription state diff --git a/luz/src/chat.rs b/luz/src/chat.rs index 4fb8579..7bb99e1 100644 --- a/luz/src/chat.rs +++ b/luz/src/chat.rs @@ -28,7 +28,7 @@ pub struct Body {  #[derive(sqlx::FromRow)]  pub struct Chat { -    correspondent: JID, +    pub correspondent: JID,      // message history is not stored in chat, retreived separately.      // pub message_history: Vec<Message>,  } diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index fda2b90..95aae1a 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -15,6 +15,7 @@ use tokio::{      sync::{mpsc, oneshot, Mutex},      task::{JoinHandle, JoinSet},  }; +use tracing::info;  use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};  use crate::{ @@ -104,8 +105,10 @@ impl Supervisor {                  Some(msg) = self.connection_commands.recv() => {                      match msg {                          SupervisorCommand::Disconnect => { +                            info!("disconnecting");                              let _ = self.writer_handle.send(WriteControl::Disconnect).await;                              let _ = self.reader_handle.send(ReadControl::Disconnect).await; +                            info!("sent disconnect command");                              tokio::select! {                                  _ = async { tokio::join!(                                      async { let _ = (&mut self.writer_handle.handle).await; }, @@ -116,6 +119,7 @@ impl Supervisor {                                      (&mut self.writer_handle.handle).abort();                                  }                              } +                            info!("disconnected");                              break;                          },                          SupervisorCommand::Reconnect(state) => { diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 46f1dc9..4390e00 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -202,9 +202,10 @@ async fn handle_stanza(                  };                  // TODO: can this be more efficient?                  let result = db -                    .create_message_with_user_and_chat(message.clone(), from.clone()) +                    .create_message_with_user_resource_and_chat(message.clone(), from.clone())                      .await;                  if let Err(e) = result { +                    tracing::error!("messagecreate");                      let _ = update_sender                          .send(UpdateMessage::Error(Error::CacheUpdate(e.into())))                          .await; diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs index 4202163..3a1d73d 100644 --- a/luz/src/db/mod.rs +++ b/luz/src/db/mod.rs @@ -213,14 +213,14 @@ impl Db {      pub async fn replace_cached_roster(&self, roster: Vec<Contact>) -> Result<(), Error> {          sqlx::query!("delete from roster").execute(&self.db).await?;          for contact in roster { -            self.create_contact(contact).await?; +            self.upsert_contact(contact).await?;          }          Ok(())      }      pub async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {          let mut roster: Vec<Contact> = -            sqlx::query_as("select * from roster full outer join users on jid = user_jid") +            sqlx::query_as("select * from roster join users on jid = user_jid")                  .fetch_all(&self.db)                  .await?;          for contact in &mut roster { @@ -303,6 +303,7 @@ impl Db {          struct Row {              id: Uuid,          } +        let chat = chat.as_bare();          let chat_id: Row = sqlx::query_as("select id from chats where correspondent = ?")              .bind(chat)              .fetch_one(&self.db) @@ -327,19 +328,24 @@ impl Db {      /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.      pub async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {          // TODO: one query +        let bare_jid = message.from.as_bare(); +        let resource = message.from.resourcepart;          let chat_id = self.read_chat_id(chat).await?; -        sqlx::query!("insert into messages (id, body, chat_id, from_jid, originally_from) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, message.from, message.from).execute(&self.db).await?; +        sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource) values (?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource).execute(&self.db).await?;          Ok(())      } -    pub async fn create_message_with_user_and_chat( +    // create direct message +    pub async fn create_message_with_user_resource_and_chat(          &self,          message: Message,          chat: JID,      ) -> Result<(), Error> { +        let bare_chat = chat.as_bare(); +        let resource = &chat.resourcepart;          sqlx::query!(              "insert into users (jid) values (?) on conflict do nothing", -            chat +            bare_chat          )          .execute(&self.db)          .await?; @@ -347,10 +353,19 @@ impl Db {          sqlx::query!(              "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",              id, -            chat +            bare_chat          )          .execute(&self.db)          .await?; +        if let Some(resource) = resource { +            sqlx::query!( +                "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing", +                bare_chat, +                resource +            ) +            .execute(&self.db) +            .await?; +        }          self.create_message(message, chat).await?;          Ok(())      } diff --git a/luz/src/error.rs b/luz/src/error.rs index 4fdce79..f0b956e 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -18,6 +18,14 @@ pub enum Error {      CacheUpdate(Reason),      UnrecognizedContent(peanuts::element::Content),      Iq(IqError), +    Cloned, +} + +// TODO: this is horrifying, maybe just use tracing to forward error events??? +impl Clone for Error { +    fn clone(&self) -> Self { +        Error::Cloned +    }  }  #[derive(Debug)] @@ -37,7 +45,7 @@ pub enum RecvMessageError {      MissingFrom,  } -#[derive(Debug)] +#[derive(Debug, Clone)]  pub enum ConnectionError {      ConnectionFailed(Reason),      RosterRetreival(Reason), @@ -45,6 +53,7 @@ pub enum ConnectionError {      NoCachedStatus(Reason),  } +#[derive(Debug)]  pub struct RosterError(pub Reason);  impl From<RosterError> for Error { @@ -88,6 +97,14 @@ pub enum Reason {      UnexpectedStanza(Stanza),      Disconnected,      ChannelSend, +    Cloned, +} + +// TODO: same here +impl Clone for Reason { +    fn clone(&self) -> Self { +        Reason::Cloned +    }  }  impl From<oneshot::error::RecvError> for Reason { diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 901553b..4c95ab6 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -32,9 +32,9 @@ pub mod chat;  mod connection;  mod db;  mod error; -mod presence; -mod roster; -mod user; +pub mod presence; +pub mod roster; +pub mod user;  pub struct Luz {      command_sender: mpsc::Sender<CommandMessage>, @@ -82,7 +82,9 @@ impl Luz {          loop {              let msg = tokio::select! {                  // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy +                // THIS IS NOT OKAY LOLLLL                  _ = &mut self.connection_supervisor_shutdown => { +                    info!("got this");                      *self.connected.lock().await = None;                      continue;                  } @@ -247,11 +249,13 @@ impl Luz {                              // TODO: send unavailable presence                              if let Some((_write_handle, supervisor_handle)) = c.take() {                                  let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; +                                c = None;                              } else {                                  unreachable!()                              };                          }                      } +                    info!("lock released")                  }                  _ => {                      match self.connected.lock().await.as_ref() { @@ -962,10 +966,19 @@ impl CommandMessage {  // TODO: separate sender and receiver, store handle to Luz process to ensure dropping  // #[derive(Clone)] +#[derive(Debug)]  pub struct LuzHandle {      sender: mpsc::Sender<CommandMessage>,  } +impl Clone for LuzHandle { +    fn clone(&self) -> Self { +        Self { +            sender: self.sender.clone(), +        } +    } +} +  impl Deref for LuzHandle {      type Target = mpsc::Sender<CommandMessage>; @@ -981,11 +994,12 @@ impl DerefMut for LuzHandle {  }  impl LuzHandle { -    pub fn new( +    pub async fn new(          jid: JID,          password: String, -        db: SqlitePool, -    ) -> (Self, mpsc::Receiver<UpdateMessage>) { +        db: &str, +    ) -> Result<(Self, mpsc::Receiver<UpdateMessage>), Reason> { +        let db = SqlitePool::connect(db).await?;          let (command_sender, command_receiver) = mpsc::channel(20);          let (update_sender, update_receiver) = mpsc::channel(20);          // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) @@ -1003,12 +1017,12 @@ impl LuzHandle {          );          tokio::spawn(async move { actor.run().await }); -        ( +        Ok((              Self {                  sender: command_sender,              },              update_receiver, -        ) +        ))      }  } @@ -1064,7 +1078,7 @@ pub enum CommandMessage {      SendMessage(JID, Body, oneshot::Sender<Result<(), Reason>>),  } -#[derive(Debug)] +#[derive(Debug, Clone)]  pub enum UpdateMessage {      Error(Error),      Online(Online, Vec<Contact>), diff --git a/luz/src/presence.rs b/luz/src/presence.rs index 1df20a7..40d79c5 100644 --- a/luz/src/presence.rs +++ b/luz/src/presence.rs @@ -4,6 +4,7 @@ use stanza::client::presence::String1024;  #[derive(Debug, Default, sqlx::FromRow, Clone)]  pub struct Online {      pub show: Option<Show>, +    #[sqlx(rename = "message")]      pub status: Option<String>,      #[sqlx(skip)]      pub priority: Option<i8>, @@ -53,12 +54,12 @@ impl sqlx::Encode<'_, Sqlite> for Show {      }  } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)]  pub struct Offline {      pub status: Option<String>,  } -#[derive(Debug)] +#[derive(Debug, Clone)]  pub enum Presence {      Online(Online),      Offline(Offline), diff --git a/luz/src/roster.rs b/luz/src/roster.rs index 0e43a8a..43c32f5 100644 --- a/luz/src/roster.rs +++ b/luz/src/roster.rs @@ -58,7 +58,7 @@ impl sqlx::Decode<'_, Sqlite> for Subscription {              "out-pending-in" => Ok(Self::OutPendingIn),              "in-pending-out" => Ok(Self::InPendingOut),              "buddy" => Ok(Self::Buddy), -            _ => unreachable!(), +            _ => panic!("unexpected subscription `{value}`"),          }      }  } | 
