diff options
| author | 2025-02-18 06:14:43 +0000 | |
|---|---|---|
| committer | 2025-02-18 06:14:43 +0000 | |
| commit | 5dd488550f9959914d16bde9269284ebd043e0e6 (patch) | |
| tree | 3201c93bb0a457526f62139501e697287b2db9a1 | |
| parent | 68a7d136705133dc5d3a5d43b9ff4da28eeb6d5b (diff) | |
| download | luz-5dd488550f9959914d16bde9269284ebd043e0e6.tar.gz luz-5dd488550f9959914d16bde9269284ebd043e0e6.tar.bz2 luz-5dd488550f9959914d16bde9269284ebd043e0e6.zip | |
WIP: roster retrieval
Diffstat (limited to '')
| -rw-r--r-- | luz/migrations/20240113011930_luz.sql | 19 | ||||
| -rw-r--r-- | luz/src/connection/mod.rs | 33 | ||||
| -rw-r--r-- | luz/src/connection/read.rs | 26 | ||||
| -rw-r--r-- | luz/src/connection/write.rs | 8 | ||||
| -rw-r--r-- | luz/src/db/mod.rs | 26 | ||||
| -rw-r--r-- | luz/src/error.rs | 57 | ||||
| -rw-r--r-- | luz/src/lib.rs | 259 | ||||
| -rw-r--r-- | luz/src/main.rs | 1 | ||||
| -rw-r--r-- | luz/src/presence.rs | 53 | ||||
| -rw-r--r-- | luz/src/roster.rs | 45 | ||||
| -rw-r--r-- | stanza/src/roster.rs | 14 | 
11 files changed, 466 insertions, 75 deletions
| diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql index 3b18208..082cc4b 100644 --- a/luz/migrations/20240113011930_luz.sql +++ b/luz/migrations/20240113011930_luz.sql @@ -6,6 +6,7 @@ create table users(      jid text primary key not null,      -- can receive presence status from non-contacts      cached_status_message text +    -- TODO: last_seen  );  -- enum for subscription state @@ -55,6 +56,8 @@ create table messages (      -- check ((chat_id == null) <> (channel_id == null)),      -- check ((chat_id == null) or (channel_id == null)),      -- user is the current "owner" of the message +    -- TODO: queued messages offline +    -- TODO: timestamp      -- TODO: icky      -- the user to show it coming from (not necessarily the original sender) @@ -68,3 +71,19 @@ create table messages (      foreign key(from_jid) references users(jid),      foreign key(originally_from) references users(jid)  ); + +-- enum for subscription state +create table show ( +    state text primary key not null +); + +insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away'); + +create table cached_status ( +    id integer primary key not null, +    show text, +    message text, +    foreign key(show) references show(state) +); + +insert into cached_status (id) values (0); diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index 8ff433b..fda2b90 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -17,7 +17,11 @@ use tokio::{  };  use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage}; -use crate::{db::Db, error::Error, UpdateMessage}; +use crate::{ +    db::Db, +    error::{Error, Reason}, +    UpdateMessage, +};  mod read;  pub(crate) mod write; @@ -31,7 +35,7 @@ pub struct Supervisor {          tokio::task::JoinSet<()>,          mpsc::Sender<SupervisorCommand>,          WriteHandle, -        Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +        Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,      )>,      sender: mpsc::Sender<UpdateMessage>,      writer_handle: WriteControlHandle, @@ -57,7 +61,7 @@ pub enum State {              tokio::task::JoinSet<()>,              mpsc::Sender<SupervisorCommand>,              WriteHandle, -            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,          ),      ),  } @@ -72,7 +76,7 @@ impl Supervisor {              JoinSet<()>,              mpsc::Sender<SupervisorCommand>,              WriteHandle, -            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,          )>,          sender: mpsc::Sender<UpdateMessage>,          writer_handle: WriteControlHandle, @@ -172,9 +176,10 @@ impl Supervisor {                                      // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.                                      write_state.close();                                      while let Some(msg) = write_state.recv().await { -                                        let _ = msg.respond_to.send(Err(Error::LostConnection)); +                                        let _ = msg.respond_to.send(Err(Reason::LostConnection));                                      } -                                    let _ = self.sender.send(UpdateMessage::Error(e.into())).await; +                                    // TODO: is this the correct error? +                                    let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;                                      break;                                  },                              } @@ -218,11 +223,12 @@ impl Supervisor {                          Err(e) => {                              // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.                              write_recv.close(); -                            let _ = write_msg.respond_to.send(Err(Error::LostConnection)); +                            let _ = write_msg.respond_to.send(Err(Reason::LostConnection));                              while let Some(msg) = write_recv.recv().await { -                                let _ = msg.respond_to.send(Err(Error::LostConnection)); +                                let _ = msg.respond_to.send(Err(Reason::LostConnection));                              } -                            let _ = self.sender.send(UpdateMessage::Error(e.into())).await; +                            // TODO: is this the correct error to send? +                            let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;                              break;                          },                      } @@ -268,12 +274,13 @@ impl Supervisor {                              // if reconnection failure, respond to all current messages with lost connection error.                              write_receiver.close();                              if let Some(msg) = retry_msg { -                                msg.respond_to.send(Err(Error::LostConnection)); +                                msg.respond_to.send(Err(Reason::LostConnection));                              }                              while let Some(msg) = write_receiver.recv().await { -                                msg.respond_to.send(Err(Error::LostConnection)); +                                msg.respond_to.send(Err(Reason::LostConnection));                              } -                            let _ = self.sender.send(UpdateMessage::Error(e.into())).await; +                            // TODO: is this the correct error? +                            let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;                              break;                          },                      } @@ -331,7 +338,7 @@ impl SupervisorHandle {          on_shutdown: oneshot::Sender<()>,          jid: Arc<Mutex<JID>>,          password: Arc<String>, -        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,      ) -> (WriteHandle, Self) {          let (command_sender, command_receiver) = mpsc::channel(20);          let (writer_error_sender, writer_error_receiver) = oneshot::channel(); diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 692952b..8f8c4a0 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -13,7 +13,11 @@ use tokio::{  };  use tracing::info; -use crate::{db::Db, error::Error, UpdateMessage}; +use crate::{ +    db::Db, +    error::{Error, Reason}, +    UpdateMessage, +};  use super::{      write::{WriteHandle, WriteMessage}, @@ -29,7 +33,7 @@ pub struct Read {          JoinSet<()>,          mpsc::Sender<SupervisorCommand>,          WriteHandle, -        Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +        Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,      )>,      db: Db,      update_sender: mpsc::Sender<UpdateMessage>, @@ -39,7 +43,7 @@ pub struct Read {      disconnecting: bool,      disconnect_timedout: oneshot::Receiver<()>,      // TODO: use proper stanza ids -    pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +    pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,  }  impl Read { @@ -52,7 +56,7 @@ impl Read {              JoinSet<()>,              mpsc::Sender<SupervisorCommand>,              WriteHandle, -            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,          )>,          db: Db,          update_sender: mpsc::Sender<UpdateMessage>, @@ -60,7 +64,7 @@ impl Read {          supervisor_control: mpsc::Sender<SupervisorCommand>,          write_handle: WriteHandle,          tasks: JoinSet<()>, -        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,      ) -> Self {          let (send, recv) = oneshot::channel();          Self { @@ -153,7 +157,7 @@ impl Read {          // when it aborts, must clear iq map no matter what          let mut iqs = self.pending_iqs.lock().await;          for (_id, sender) in iqs.drain() { -            let _ = sender.send(Err(Error::LostConnection)); +            let _ = sender.send(Err(Reason::LostConnection));          }      }  } @@ -182,7 +186,7 @@ pub enum ReadControl {              JoinSet<()>,              mpsc::Sender<SupervisorCommand>,              WriteHandle, -            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,          )>,      ),  } @@ -215,13 +219,13 @@ impl ReadControlHandle {              JoinSet<()>,              mpsc::Sender<SupervisorCommand>,              WriteHandle, -            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,          )>,          db: Db,          sender: mpsc::Sender<UpdateMessage>,          supervisor_control: mpsc::Sender<SupervisorCommand>,          jabber_write: WriteHandle, -        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,      ) -> Self {          let (control_sender, control_receiver) = mpsc::channel(20); @@ -252,14 +256,14 @@ impl ReadControlHandle {              JoinSet<()>,              mpsc::Sender<SupervisorCommand>,              WriteHandle, -            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,          )>,          db: Db,          sender: mpsc::Sender<UpdateMessage>,          supervisor_control: mpsc::Sender<SupervisorCommand>,          jabber_write: WriteHandle,          tasks: JoinSet<()>, -        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,      ) -> Self {          let (control_sender, control_receiver) = mpsc::channel(20); diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs index 18dba5c..70584a2 100644 --- a/luz/src/connection/write.rs +++ b/luz/src/connection/write.rs @@ -7,7 +7,7 @@ use tokio::{      task::JoinHandle,  }; -use crate::error::Error; +use crate::error::{Error, Reason};  // actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.  pub struct Write { @@ -19,7 +19,7 @@ pub struct Write {  pub struct WriteMessage {      pub stanza: Stanza, -    pub respond_to: oneshot::Sender<Result<(), Error>>, +    pub respond_to: oneshot::Sender<Result<(), Reason>>,  }  pub enum WriteControl { @@ -84,9 +84,9 @@ impl Write {                                      Err(e) => match &e {                                          peanuts::Error::ReadError(_error) => {                                              // if connection lost during disconnection, just send lost connection error to the write requests -                                            let _ = msg.respond_to.send(Err(Error::LostConnection)); +                                            let _ = msg.respond_to.send(Err(Reason::LostConnection));                                              while let Some(msg) = self.stanza_receiver.recv().await { -                                                let _ = msg.respond_to.send(Err(Error::LostConnection)); +                                                let _ = msg.respond_to.send(Err(Reason::LostConnection));                                              }                                              break;                                          } diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs index f598fbb..7557f70 100644 --- a/luz/src/db/mod.rs +++ b/luz/src/db/mod.rs @@ -6,6 +6,7 @@ use uuid::Uuid;  use crate::{      chat::{Chat, Message}, +    presence::Online,      roster::Contact,      user::User,  }; @@ -315,4 +316,29 @@ impl Db {              .await?;          Ok(messages)      } + +    pub async fn read_cached_status(&self) -> Result<Online, Error> { +        let online: Online = sqlx::query_as("select * from cached_status where id = 0") +            .fetch_one(&self.db) +            .await?; +        Ok(online) +    } + +    pub async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { +        sqlx::query!( +            "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?", +            status.show, +            status.status, +            status.show, +            status.status +        ).execute(&self.db).await?; +        Ok(()) +    } + +    pub async fn delete_cached_status(&self) -> Result<(), Error> { +        sqlx::query!("update cached_status set show = null, message = null where id = 0") +            .execute(&self.db) +            .await?; +        Ok(()) +    }  } diff --git a/luz/src/error.rs b/luz/src/error.rs index 16e1c6e..b9a6487 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -1,6 +1,11 @@ +use stanza::client::Stanza; +use tokio::sync::oneshot::{self}; +  #[derive(Debug)]  pub enum Error {      AlreadyConnected, +    // TODO: change to Connecting(ConnectingError) +    Connection(ConnectionError),      Presence(Reason),      Roster(Reason),      SendMessage(Reason), @@ -9,6 +14,42 @@ pub enum Error {  }  #[derive(Debug)] +pub enum ConnectionError { +    ConnectionFailed(Reason), +    RosterRetreival(Reason), +    SendPresence(Reason), +    NoCachedStatus(Reason), +} + +pub struct RosterError(pub Reason); + +impl From<RosterError> for Error { +    fn from(e: RosterError) -> Self { +        Self::Roster(e.0) +    } +} + +impl From<RosterError> for ConnectionError { +    fn from(e: RosterError) -> Self { +        Self::RosterRetreival(e.0) +    } +} + +pub struct StatusError(Reason); + +impl From<StatusError> for Error { +    fn from(e: StatusError) -> Self { +        Error::Presence(e.0) +    } +} + +impl From<StatusError> for ConnectionError { +    fn from(e: StatusError) -> Self { +        Self::SendPresence(e.0) +    } +} + +#[derive(Debug)]  pub enum Reason {      // TODO: organisastion of error into internal error thing      Timeout, @@ -19,27 +60,35 @@ pub enum Reason {      SQL(sqlx::Error),      // JID(jid::ParseError),      LostConnection, +    OneshotRecv(oneshot::error::RecvError), +    UnexpectedStanza(Stanza), +} + +impl From<oneshot::error::RecvError> for Reason { +    fn from(e: oneshot::error::RecvError) -> Reason { +        Self::OneshotRecv(e) +    }  } -impl From<peanuts::Error> for Error { +impl From<peanuts::Error> for Reason {      fn from(e: peanuts::Error) -> Self {          Self::XML(e)      }  } -// impl From<jid::ParseError> for Error { +// impl From<jid::ParseError> for Reason {  //     fn from(e: jid::ParseError) -> Self {  //         Self::JID(e)  //     }  // } -impl From<sqlx::Error> for Error { +impl From<sqlx::Error> for Reason {      fn from(e: sqlx::Error) -> Self {          Self::SQL(e)      }  } -impl From<jabber::Error> for Error { +impl From<jabber::Error> for Reason {      fn from(e: jabber::Error) -> Self {          Self::Jabber(e)      } diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 4d59e61..c14bae6 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -7,6 +7,7 @@ use std::{  use chat::{Body, Chat, Message};  use connection::{write::WriteMessage, SupervisorSender};  use db::Db; +use error::{ConnectionError, Reason, RosterError, StatusError};  use jabber::JID;  use presence::{Offline, Online, Presence};  use roster::{Contact, ContactUpdate}; @@ -19,6 +20,7 @@ use tokio::{      sync::{mpsc, oneshot, Mutex},      task::JoinSet,  }; +use tracing::info;  use user::User;  use uuid::Uuid; @@ -35,12 +37,13 @@ mod roster;  mod user;  pub struct Luz { +    command_sender: mpsc::Sender<CommandMessage>,      receiver: mpsc::Receiver<CommandMessage>,      jid: Arc<Mutex<JID>>,      // TODO: use a dyn passwordprovider trait to avoid storing password in memory      password: Arc<String>,      connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>, -    pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +    pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,      db: Db,      sender: mpsc::Sender<UpdateMessage>,      /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected @@ -52,6 +55,7 @@ pub struct Luz {  impl Luz {      fn new( +        command_sender: mpsc::Sender<CommandMessage>,          receiver: mpsc::Receiver<CommandMessage>,          jid: Arc<Mutex<JID>>,          password: String, @@ -70,6 +74,7 @@ impl Luz {              tasks: JoinSet::new(),              connection_supervisor_shutdown,              pending_iqs: Arc::new(Mutex::new(HashMap::new())), +            command_sender,          }      } @@ -117,30 +122,127 @@ impl Luz {                                          self.pending_iqs.clone(),                                      );                                      self.connection_supervisor_shutdown = shutdown_recv; -                                    *connection_lock = Some((writer, supervisor)); -                                    self.sender.send(UpdateMessage::Connected(todo!())).await; +                                    // TODO: get roster and send initial presence +                                    let (send, recv) = oneshot::channel(); +                                    CommandMessage::GetRoster(send) +                                        .handle_online( +                                            writer.clone(), +                                            supervisor.sender(), +                                            self.jid.clone(), +                                            self.db.clone(), +                                            self.sender.clone(), +                                            self.pending_iqs.clone(), +                                        ) +                                        .await; +                                    let roster = recv.await; +                                    match roster { +                                        Ok(r) => { +                                            match r { +                                                Ok(roster) => { +                                                    let online = self.db.read_cached_status().await; +                                                    let online = match online { +                                                        Ok(online) => online, +                                                        Err(e) => { +                                                            let _ = self +                                                            .sender +                                                            .send(UpdateMessage::Error( +                                                                Error::Connection( +                                                                    ConnectionError::NoCachedStatus( +                                                                        e.into(), +                                                                    ), +                                                                ), +                                                            )) +                                                            .await; +                                                            Online::default() +                                                        } +                                                    }; +                                                    let (send, recv) = oneshot::channel(); +                                                    CommandMessage::SetStatus(online.clone(), send) +                                                        .handle_online( +                                                            writer.clone(), +                                                            supervisor.sender(), +                                                            self.jid.clone(), +                                                            self.db.clone(), +                                                            self.sender.clone(), +                                                            self.pending_iqs.clone(), +                                                        ) +                                                        .await; +                                                    let set_status = recv.await; +                                                    match set_status { +                                                        Ok(s) => match s { +                                                            Ok(()) => { +                                                                *connection_lock = +                                                                    Some((writer, supervisor)); +                                                                let _ = self +                                                                    .sender +                                                                    .send(UpdateMessage::Online( +                                                                        online, roster, +                                                                    )) +                                                                    .await; +                                                                continue; +                                                            } +                                                            Err(e) => { +                                                                let _ = self +                                                                    .sender +                                                                    .send(UpdateMessage::Error( +                                                                        Error::Connection(e.into()), +                                                                    )) +                                                                    .await; +                                                            } +                                                        }, +                                                        Err(e) => { +                                                            let _ = self.sender.send(UpdateMessage::Error(Error::Connection(ConnectionError::SendPresence(e.into())))).await; +                                                        } +                                                    } +                                                } +                                                Err(e) => { +                                                    let _ = self +                                                        .sender +                                                        .send(UpdateMessage::Error( +                                                            Error::Connection(e.into()), +                                                        )) +                                                        .await; +                                                } +                                            } +                                        } +                                        Err(e) => { +                                            let _ = self +                                                .sender +                                                .send(UpdateMessage::Error(Error::Connection( +                                                    ConnectionError::RosterRetreival(e.into()), +                                                ))) +                                                .await; +                                        } +                                    }                                  }                                  Err(e) => { -                                    self.sender.send(UpdateMessage::Error(e.into())); +                                    let _ = +                                        self.sender.send(UpdateMessage::Error(Error::Connection( +                                            ConnectionError::ConnectionFailed(e.into()), +                                        )));                                  }                              }                          }                      };                  } -                CommandMessage::Disconnect => match self.connected.lock().await.as_mut() { -                    None => { -                        self.sender -                            .send(UpdateMessage::Error(Error::AlreadyDisconnected)) -                            .await; -                    } -                    mut c => { -                        if let Some((_write_handle, supervisor_handle)) = c.take() { -                            let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; -                        } else { -                            unreachable!() -                        }; +                CommandMessage::Disconnect(_offline) => { +                    match self.connected.lock().await.as_mut() { +                        None => { +                            let _ = self +                                .sender +                                .send(UpdateMessage::Error(Error::AlreadyDisconnected)) +                                .await; +                        } +                        mut c => { +                            // TODO: send unavailable presence +                            if let Some((_write_handle, supervisor_handle)) = c.take() { +                                let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; +                            } else { +                                unreachable!() +                            }; +                        }                      } -                }, +                }                  _ => {                      match self.connected.lock().await.as_ref() {                          Some((w, s)) => self.tasks.spawn(msg.handle_online( @@ -168,9 +270,41 @@ impl CommandMessage {          mut self,          jid: Arc<Mutex<JID>>,          db: Db, -        sender: mpsc::Sender<UpdateMessage>, +        update_sender: mpsc::Sender<UpdateMessage>,      ) { -        todo!() +        match self { +            CommandMessage::Connect => unreachable!(), +            CommandMessage::Disconnect(offline) => unreachable!(), +            CommandMessage::GetRoster(sender) => { +                let roster = db.read_cached_roster().await; +                match roster { +                    Ok(roster) => { +                        let _ = sender.send(Ok(roster)); +                    } +                    Err(e) => { +                        let _ = sender.send(Err(RosterError(e.into()))); +                    } +                } +            } +            CommandMessage::GetChats(sender) => todo!(), +            CommandMessage::GetChat(jid, sender) => todo!(), +            CommandMessage::GetMessages(jid, sender) => todo!(), +            CommandMessage::DeleteChat(jid, sender) => todo!(), +            CommandMessage::DeleteMessage(uuid, sender) => todo!(), +            CommandMessage::GetUser(jid, sender) => todo!(), +            CommandMessage::AddContact(jid, sender) => todo!(), +            CommandMessage::BuddyRequest(jid, sender) => todo!(), +            CommandMessage::SubscriptionRequest(jid, sender) => todo!(), +            CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(), +            CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(), +            CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(), +            CommandMessage::UnsubscribeContact(jid, sender) => todo!(), +            CommandMessage::UnfriendContact(jid, sender) => todo!(), +            CommandMessage::DeleteContact(jid, sender) => todo!(), +            CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(), +            CommandMessage::SetStatus(online, sender) => todo!(), +            CommandMessage::SendMessage(jid, body, sender) => todo!(), +        }      }      pub async fn handle_online( @@ -181,20 +315,25 @@ impl CommandMessage {          jid: Arc<Mutex<JID>>,          db: Db,          sender: mpsc::Sender<UpdateMessage>, -        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +        pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,      ) {          match self {              CommandMessage::Connect => unreachable!(), -            CommandMessage::Disconnect => unreachable!(), -            CommandMessage::GetRoster => { +            CommandMessage::Disconnect(_) => unreachable!(), +            CommandMessage::GetRoster(result_sender) => {                  // TODO: jid resource should probably be stored within the connection                  let owned_jid: JID;                  {                      owned_jid = jid.lock().await.clone();                  } +                let iq_id = Uuid::new_v4().to_string(); +                let (send, iq_recv) = oneshot::channel(); +                { +                    pending_iqs.lock().await.insert(iq_id.clone(), send); +                }                  let stanza = Stanza::Iq(Iq {                      from: Some(owned_jid), -                    id: "getting-roster".to_string(), +                    id: iq_id.to_string(),                      to: None,                      r#type: IqType::Get,                      lang: None, @@ -211,13 +350,69 @@ impl CommandMessage {                          respond_to: send,                      })                      .await; +                // TODO: timeout                  match recv.await { -                    Ok(Ok(())) => println!("roster request sent"), -                    e => println!("error: {:?}", e), +                    Ok(Ok(())) => info!("roster request sent"), +                    Ok(Err(e)) => { +                        // TODO: log errors if fail to send +                        let _ = result_sender.send(Err(RosterError(e.into()))); +                        return; +                    } +                    Err(e) => { +                        let _ = result_sender.send(Err(RosterError(e.into()))); +                        return; +                    }                  }; +                // TODO: timeout +                match iq_recv.await { +                    Ok(Ok(stanza)) => match stanza { +                        Stanza::Iq(Iq { +                            from, +                            id, +                            to, +                            r#type, +                            lang, +                            query: Some(iq::Query::Roster(stanza::roster::Query { ver, items })), +                            errors, +                        }) if id == iq_id && r#type == IqType::Result => { +                            let contacts: Vec<Contact> = +                                items.into_iter().map(|item| item.into()).collect(); +                            result_sender.send(Ok(contacts)); +                            return; +                        } +                        s => { +                            result_sender.send(Err(RosterError(Reason::UnexpectedStanza(s)))); +                            return; +                        } +                    }, +                    Ok(Err(e)) => { +                        result_sender.send(Err(RosterError(e.into()))); +                        return; +                    } +                    Err(e) => { +                        result_sender.send(Err(RosterError(e.into()))); +                        return; +                    } +                }              } -            CommandMessage::SendMessage { id, to, body } => todo!(), -            _ => todo!(), +            CommandMessage::GetChats(sender) => todo!(), +            CommandMessage::GetChat(jid, sender) => todo!(), +            CommandMessage::GetMessages(jid, sender) => todo!(), +            CommandMessage::DeleteChat(jid, sender) => todo!(), +            CommandMessage::DeleteMessage(uuid, sender) => todo!(), +            CommandMessage::GetUser(jid, sender) => todo!(), +            CommandMessage::AddContact(jid, sender) => todo!(), +            CommandMessage::BuddyRequest(jid, sender) => todo!(), +            CommandMessage::SubscriptionRequest(jid, sender) => todo!(), +            CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(), +            CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(), +            CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(), +            CommandMessage::UnsubscribeContact(jid, sender) => todo!(), +            CommandMessage::UnfriendContact(jid, sender) => todo!(), +            CommandMessage::DeleteContact(jid, sender) => todo!(), +            CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(), +            CommandMessage::SetStatus(online, sender) => todo!(), +            CommandMessage::SendMessage(jid, body, sender) => todo!(),          }      }  } @@ -254,6 +449,7 @@ impl LuzHandle {          let (sup_send, sup_recv) = oneshot::channel();          let actor = Luz::new( +            command_sender.clone(),              command_receiver,              Arc::new(Mutex::new(jid)),              password, @@ -280,10 +476,12 @@ pub enum CommandMessage {      /// disconnect from XMPP chat server, sending unavailable presence then closing stream.      Disconnect(Offline),      /// get the roster. if offline, retreive cached version from database. should be stored in application memory -    GetRoster(oneshot::Sender<Result<Vec<Contact>, Error>>), +    GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),      /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)      // TODO: paging and filtering      GetChats(oneshot::Sender<Result<Vec<Chat>, Error>>), +    /// get a specific chat by jid +    GetChat(JID, oneshot::Sender<Result<Chat, Error>>),      /// get message history for chat (does appropriate mam things)      // TODO: paging and filtering      GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Error>>), @@ -315,7 +513,7 @@ pub enum CommandMessage {      /// update contact      UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Error>>),      /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. -    SetStatusMessage(Option<String>, oneshot::Sender<Result<(), Error>>), +    SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),      /// send a directed presence (usually to a non-contact).      // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)      /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a @@ -326,9 +524,10 @@ pub enum CommandMessage {  #[derive(Debug)]  pub enum UpdateMessage {      Error(Error), -    Online(Online), +    Online(Online, Vec<Contact>),      Offline(Offline),      /// received roster from jabber server (replace full app roster state with this) +    /// is this needed?      FullRoster(Vec<Contact>),      /// (only update app roster state, don't replace)      RosterUpdate(Contact), diff --git a/luz/src/main.rs b/luz/src/main.rs index 7b3815f..5e9cd13 100644 --- a/luz/src/main.rs +++ b/luz/src/main.rs @@ -18,6 +18,5 @@ async fn main() {      });      luz.send(CommandMessage::Connect).await.unwrap(); -    luz.send(CommandMessage::GetRoster).await.unwrap();      tokio::time::sleep(Duration::from_secs(15)).await;  } diff --git a/luz/src/presence.rs b/luz/src/presence.rs index b7ebe1d..fac1bb4 100644 --- a/luz/src/presence.rs +++ b/luz/src/presence.rs @@ -1,12 +1,57 @@ -use stanza::client::presence::Show; +use sqlx::Sqlite; -#[derive(Debug, Default)] +#[derive(Debug, Default, sqlx::FromRow, Clone)]  pub struct Online { -    show: Option<Show>, -    status: Option<String>, +    pub show: Option<Show>, +    pub status: Option<String>, +    #[sqlx(skip)]      priority: Option<i8>,  } +#[derive(Debug, Clone, Copy)] +pub enum Show { +    Away, +    Chat, +    DoNotDisturb, +    ExtendedAway, +} + +impl sqlx::Type<Sqlite> for Show { +    fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo { +        <&str as sqlx::Type<Sqlite>>::type_info() +    } +} + +impl sqlx::Decode<'_, Sqlite> for Show { +    fn decode( +        value: <Sqlite as sqlx::Database>::ValueRef<'_>, +    ) -> Result<Self, sqlx::error::BoxDynError> { +        let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?; +        match value { +            "away" => Ok(Self::Away), +            "chat" => Ok(Self::Chat), +            "do-not-disturb" => Ok(Self::DoNotDisturb), +            "extended-away" => Ok(Self::ExtendedAway), +            _ => unreachable!(), +        } +    } +} + +impl sqlx::Encode<'_, Sqlite> for Show { +    fn encode_by_ref( +        &self, +        buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>, +    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> { +        let value = match self { +            Show::Away => "away", +            Show::Chat => "chat", +            Show::DoNotDisturb => "do-not-disturb", +            Show::ExtendedAway => "extended-away", +        }; +        <&str as sqlx::Encode<Sqlite>>::encode(value, buf) +    } +} +  #[derive(Debug, Default)]  pub struct Offline {      status: Option<String>, diff --git a/luz/src/roster.rs b/luz/src/roster.rs index 2f50eb6..2e3de7e 100644 --- a/luz/src/roster.rs +++ b/luz/src/roster.rs @@ -24,7 +24,7 @@ pub struct Contact {  }  #[derive(Debug)] -enum Subscription { +pub enum Subscription {      None,      PendingOut,      PendingIn, @@ -80,3 +80,46 @@ impl sqlx::Encode<'_, Sqlite> for Subscription {          <&str as sqlx::Encode<Sqlite>>::encode(value, buf)      }  } + +// none +// > +// >> +// < +// << +// >< +// >>< +// ><< +// >><< + +impl From<stanza::roster::Item> for Contact { +    fn from(value: stanza::roster::Item) -> Self { +        let subscription = match value.ask { +            true => match value.subscription { +                Some(s) => match s { +                    stanza::roster::Subscription::Both => Subscription::Buddy, +                    stanza::roster::Subscription::From => Subscription::InPendingOut, +                    stanza::roster::Subscription::None => Subscription::PendingOut, +                    stanza::roster::Subscription::Remove => Subscription::PendingOut, +                    stanza::roster::Subscription::To => Subscription::OnlyOut, +                }, +                None => Subscription::PendingOut, +            }, +            false => match value.subscription { +                Some(s) => match s { +                    stanza::roster::Subscription::Both => Subscription::Buddy, +                    stanza::roster::Subscription::From => Subscription::OnlyIn, +                    stanza::roster::Subscription::None => Subscription::None, +                    stanza::roster::Subscription::Remove => Subscription::None, +                    stanza::roster::Subscription::To => Subscription::OnlyOut, +                }, +                None => Subscription::None, +            }, +        }; +        Contact { +            user_jid: value.jid, +            subscription, +            name: value.name, +            groups: HashSet::from_iter(value.groups.into_iter().filter_map(|group| group.0)), +        } +    } +} diff --git a/stanza/src/roster.rs b/stanza/src/roster.rs index ec83403..0181193 100644 --- a/stanza/src/roster.rs +++ b/stanza/src/roster.rs @@ -37,16 +37,16 @@ impl IntoElement for Query {  #[derive(Clone, Debug)]  pub struct Item {      /// signals subscription pre-approval (server only) -    approved: Option<bool>, +    pub approved: Option<bool>,      /// signals subscription sub-states (server only) -    ask: bool, +    pub ask: bool,      /// uniquely identifies item -    jid: JID, +    pub jid: JID,      /// handle that is determined by user, not contact -    name: Option<String>, +    pub name: Option<String>,      /// state of the presence subscription -    subscription: Option<Subscription>, -    groups: Vec<Group>, +    pub subscription: Option<Subscription>, +    pub groups: Vec<Group>,  }  impl FromElement for Item { @@ -140,7 +140,7 @@ impl ToString for Subscription {  #[derive(Clone, Debug)]  // TODO: check if should be option or not -pub struct Group(Option<String>); +pub struct Group(pub Option<String>);  impl FromElement for Group {      fn from_element(mut element: peanuts::Element) -> peanuts::element::DeserializeResult<Self> { | 
