diff options
author | 2025-02-18 06:14:43 +0000 | |
---|---|---|
committer | 2025-02-18 06:14:43 +0000 | |
commit | 5dd488550f9959914d16bde9269284ebd043e0e6 (patch) | |
tree | 3201c93bb0a457526f62139501e697287b2db9a1 | |
parent | 68a7d136705133dc5d3a5d43b9ff4da28eeb6d5b (diff) | |
download | luz-5dd488550f9959914d16bde9269284ebd043e0e6.tar.gz luz-5dd488550f9959914d16bde9269284ebd043e0e6.tar.bz2 luz-5dd488550f9959914d16bde9269284ebd043e0e6.zip |
WIP: roster retrieval
-rw-r--r-- | luz/migrations/20240113011930_luz.sql | 19 | ||||
-rw-r--r-- | luz/src/connection/mod.rs | 33 | ||||
-rw-r--r-- | luz/src/connection/read.rs | 26 | ||||
-rw-r--r-- | luz/src/connection/write.rs | 8 | ||||
-rw-r--r-- | luz/src/db/mod.rs | 26 | ||||
-rw-r--r-- | luz/src/error.rs | 57 | ||||
-rw-r--r-- | luz/src/lib.rs | 259 | ||||
-rw-r--r-- | luz/src/main.rs | 1 | ||||
-rw-r--r-- | luz/src/presence.rs | 53 | ||||
-rw-r--r-- | luz/src/roster.rs | 45 | ||||
-rw-r--r-- | stanza/src/roster.rs | 14 |
11 files changed, 466 insertions, 75 deletions
diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql index 3b18208..082cc4b 100644 --- a/luz/migrations/20240113011930_luz.sql +++ b/luz/migrations/20240113011930_luz.sql @@ -6,6 +6,7 @@ create table users( jid text primary key not null, -- can receive presence status from non-contacts cached_status_message text + -- TODO: last_seen ); -- enum for subscription state @@ -55,6 +56,8 @@ create table messages ( -- check ((chat_id == null) <> (channel_id == null)), -- check ((chat_id == null) or (channel_id == null)), -- user is the current "owner" of the message + -- TODO: queued messages offline + -- TODO: timestamp -- TODO: icky -- the user to show it coming from (not necessarily the original sender) @@ -68,3 +71,19 @@ create table messages ( foreign key(from_jid) references users(jid), foreign key(originally_from) references users(jid) ); + +-- enum for subscription state +create table show ( + state text primary key not null +); + +insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away'); + +create table cached_status ( + id integer primary key not null, + show text, + message text, + foreign key(show) references show(state) +); + +insert into cached_status (id) values (0); diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index 8ff433b..fda2b90 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -17,7 +17,11 @@ use tokio::{ }; use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage}; -use crate::{db::Db, error::Error, UpdateMessage}; +use crate::{ + db::Db, + error::{Error, Reason}, + UpdateMessage, +}; mod read; pub(crate) mod write; @@ -31,7 +35,7 @@ pub struct Supervisor { tokio::task::JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, )>, sender: mpsc::Sender<UpdateMessage>, writer_handle: WriteControlHandle, @@ -57,7 +61,7 @@ pub enum State { tokio::task::JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, ), ), } @@ -72,7 +76,7 @@ impl Supervisor { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, )>, sender: mpsc::Sender<UpdateMessage>, writer_handle: WriteControlHandle, @@ -172,9 +176,10 @@ impl Supervisor { // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. write_state.close(); while let Some(msg) = write_state.recv().await { - let _ = msg.respond_to.send(Err(Error::LostConnection)); + let _ = msg.respond_to.send(Err(Reason::LostConnection)); } - let _ = self.sender.send(UpdateMessage::Error(e.into())).await; + // TODO: is this the correct error? + let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; break; }, } @@ -218,11 +223,12 @@ impl Supervisor { Err(e) => { // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. write_recv.close(); - let _ = write_msg.respond_to.send(Err(Error::LostConnection)); + let _ = write_msg.respond_to.send(Err(Reason::LostConnection)); while let Some(msg) = write_recv.recv().await { - let _ = msg.respond_to.send(Err(Error::LostConnection)); + let _ = msg.respond_to.send(Err(Reason::LostConnection)); } - let _ = self.sender.send(UpdateMessage::Error(e.into())).await; + // TODO: is this the correct error to send? + let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; break; }, } @@ -268,12 +274,13 @@ impl Supervisor { // if reconnection failure, respond to all current messages with lost connection error. write_receiver.close(); if let Some(msg) = retry_msg { - msg.respond_to.send(Err(Error::LostConnection)); + msg.respond_to.send(Err(Reason::LostConnection)); } while let Some(msg) = write_receiver.recv().await { - msg.respond_to.send(Err(Error::LostConnection)); + msg.respond_to.send(Err(Reason::LostConnection)); } - let _ = self.sender.send(UpdateMessage::Error(e.into())).await; + // TODO: is this the correct error? + let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await; break; }, } @@ -331,7 +338,7 @@ impl SupervisorHandle { on_shutdown: oneshot::Sender<()>, jid: Arc<Mutex<JID>>, password: Arc<String>, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, ) -> (WriteHandle, Self) { let (command_sender, command_receiver) = mpsc::channel(20); let (writer_error_sender, writer_error_receiver) = oneshot::channel(); diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 692952b..8f8c4a0 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -13,7 +13,11 @@ use tokio::{ }; use tracing::info; -use crate::{db::Db, error::Error, UpdateMessage}; +use crate::{ + db::Db, + error::{Error, Reason}, + UpdateMessage, +}; use super::{ write::{WriteHandle, WriteMessage}, @@ -29,7 +33,7 @@ pub struct Read { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, )>, db: Db, update_sender: mpsc::Sender<UpdateMessage>, @@ -39,7 +43,7 @@ pub struct Read { disconnecting: bool, disconnect_timedout: oneshot::Receiver<()>, // TODO: use proper stanza ids - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, } impl Read { @@ -52,7 +56,7 @@ impl Read { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, )>, db: Db, update_sender: mpsc::Sender<UpdateMessage>, @@ -60,7 +64,7 @@ impl Read { supervisor_control: mpsc::Sender<SupervisorCommand>, write_handle: WriteHandle, tasks: JoinSet<()>, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, ) -> Self { let (send, recv) = oneshot::channel(); Self { @@ -153,7 +157,7 @@ impl Read { // when it aborts, must clear iq map no matter what let mut iqs = self.pending_iqs.lock().await; for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(Error::LostConnection)); + let _ = sender.send(Err(Reason::LostConnection)); } } } @@ -182,7 +186,7 @@ pub enum ReadControl { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, )>, ), } @@ -215,13 +219,13 @@ impl ReadControlHandle { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, )>, db: Db, sender: mpsc::Sender<UpdateMessage>, supervisor_control: mpsc::Sender<SupervisorCommand>, jabber_write: WriteHandle, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); @@ -252,14 +256,14 @@ impl ReadControlHandle { JoinSet<()>, mpsc::Sender<SupervisorCommand>, WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, )>, db: Db, sender: mpsc::Sender<UpdateMessage>, supervisor_control: mpsc::Sender<SupervisorCommand>, jabber_write: WriteHandle, tasks: JoinSet<()>, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs index 18dba5c..70584a2 100644 --- a/luz/src/connection/write.rs +++ b/luz/src/connection/write.rs @@ -7,7 +7,7 @@ use tokio::{ task::JoinHandle, }; -use crate::error::Error; +use crate::error::{Error, Reason}; // actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. pub struct Write { @@ -19,7 +19,7 @@ pub struct Write { pub struct WriteMessage { pub stanza: Stanza, - pub respond_to: oneshot::Sender<Result<(), Error>>, + pub respond_to: oneshot::Sender<Result<(), Reason>>, } pub enum WriteControl { @@ -84,9 +84,9 @@ impl Write { Err(e) => match &e { peanuts::Error::ReadError(_error) => { // if connection lost during disconnection, just send lost connection error to the write requests - let _ = msg.respond_to.send(Err(Error::LostConnection)); + let _ = msg.respond_to.send(Err(Reason::LostConnection)); while let Some(msg) = self.stanza_receiver.recv().await { - let _ = msg.respond_to.send(Err(Error::LostConnection)); + let _ = msg.respond_to.send(Err(Reason::LostConnection)); } break; } diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs index f598fbb..7557f70 100644 --- a/luz/src/db/mod.rs +++ b/luz/src/db/mod.rs @@ -6,6 +6,7 @@ use uuid::Uuid; use crate::{ chat::{Chat, Message}, + presence::Online, roster::Contact, user::User, }; @@ -315,4 +316,29 @@ impl Db { .await?; Ok(messages) } + + pub async fn read_cached_status(&self) -> Result<Online, Error> { + let online: Online = sqlx::query_as("select * from cached_status where id = 0") + .fetch_one(&self.db) + .await?; + Ok(online) + } + + pub async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> { + sqlx::query!( + "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?", + status.show, + status.status, + status.show, + status.status + ).execute(&self.db).await?; + Ok(()) + } + + pub async fn delete_cached_status(&self) -> Result<(), Error> { + sqlx::query!("update cached_status set show = null, message = null where id = 0") + .execute(&self.db) + .await?; + Ok(()) + } } diff --git a/luz/src/error.rs b/luz/src/error.rs index 16e1c6e..b9a6487 100644 --- a/luz/src/error.rs +++ b/luz/src/error.rs @@ -1,6 +1,11 @@ +use stanza::client::Stanza; +use tokio::sync::oneshot::{self}; + #[derive(Debug)] pub enum Error { AlreadyConnected, + // TODO: change to Connecting(ConnectingError) + Connection(ConnectionError), Presence(Reason), Roster(Reason), SendMessage(Reason), @@ -9,6 +14,42 @@ pub enum Error { } #[derive(Debug)] +pub enum ConnectionError { + ConnectionFailed(Reason), + RosterRetreival(Reason), + SendPresence(Reason), + NoCachedStatus(Reason), +} + +pub struct RosterError(pub Reason); + +impl From<RosterError> for Error { + fn from(e: RosterError) -> Self { + Self::Roster(e.0) + } +} + +impl From<RosterError> for ConnectionError { + fn from(e: RosterError) -> Self { + Self::RosterRetreival(e.0) + } +} + +pub struct StatusError(Reason); + +impl From<StatusError> for Error { + fn from(e: StatusError) -> Self { + Error::Presence(e.0) + } +} + +impl From<StatusError> for ConnectionError { + fn from(e: StatusError) -> Self { + Self::SendPresence(e.0) + } +} + +#[derive(Debug)] pub enum Reason { // TODO: organisastion of error into internal error thing Timeout, @@ -19,27 +60,35 @@ pub enum Reason { SQL(sqlx::Error), // JID(jid::ParseError), LostConnection, + OneshotRecv(oneshot::error::RecvError), + UnexpectedStanza(Stanza), +} + +impl From<oneshot::error::RecvError> for Reason { + fn from(e: oneshot::error::RecvError) -> Reason { + Self::OneshotRecv(e) + } } -impl From<peanuts::Error> for Error { +impl From<peanuts::Error> for Reason { fn from(e: peanuts::Error) -> Self { Self::XML(e) } } -// impl From<jid::ParseError> for Error { +// impl From<jid::ParseError> for Reason { // fn from(e: jid::ParseError) -> Self { // Self::JID(e) // } // } -impl From<sqlx::Error> for Error { +impl From<sqlx::Error> for Reason { fn from(e: sqlx::Error) -> Self { Self::SQL(e) } } -impl From<jabber::Error> for Error { +impl From<jabber::Error> for Reason { fn from(e: jabber::Error) -> Self { Self::Jabber(e) } diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 4d59e61..c14bae6 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -7,6 +7,7 @@ use std::{ use chat::{Body, Chat, Message}; use connection::{write::WriteMessage, SupervisorSender}; use db::Db; +use error::{ConnectionError, Reason, RosterError, StatusError}; use jabber::JID; use presence::{Offline, Online, Presence}; use roster::{Contact, ContactUpdate}; @@ -19,6 +20,7 @@ use tokio::{ sync::{mpsc, oneshot, Mutex}, task::JoinSet, }; +use tracing::info; use user::User; use uuid::Uuid; @@ -35,12 +37,13 @@ mod roster; mod user; pub struct Luz { + command_sender: mpsc::Sender<CommandMessage>, receiver: mpsc::Receiver<CommandMessage>, jid: Arc<Mutex<JID>>, // TODO: use a dyn passwordprovider trait to avoid storing password in memory password: Arc<String>, connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, db: Db, sender: mpsc::Sender<UpdateMessage>, /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected @@ -52,6 +55,7 @@ pub struct Luz { impl Luz { fn new( + command_sender: mpsc::Sender<CommandMessage>, receiver: mpsc::Receiver<CommandMessage>, jid: Arc<Mutex<JID>>, password: String, @@ -70,6 +74,7 @@ impl Luz { tasks: JoinSet::new(), connection_supervisor_shutdown, pending_iqs: Arc::new(Mutex::new(HashMap::new())), + command_sender, } } @@ -117,30 +122,127 @@ impl Luz { self.pending_iqs.clone(), ); self.connection_supervisor_shutdown = shutdown_recv; - *connection_lock = Some((writer, supervisor)); - self.sender.send(UpdateMessage::Connected(todo!())).await; + // TODO: get roster and send initial presence + let (send, recv) = oneshot::channel(); + CommandMessage::GetRoster(send) + .handle_online( + writer.clone(), + supervisor.sender(), + self.jid.clone(), + self.db.clone(), + self.sender.clone(), + self.pending_iqs.clone(), + ) + .await; + let roster = recv.await; + match roster { + Ok(r) => { + match r { + Ok(roster) => { + let online = self.db.read_cached_status().await; + let online = match online { + Ok(online) => online, + Err(e) => { + let _ = self + .sender + .send(UpdateMessage::Error( + Error::Connection( + ConnectionError::NoCachedStatus( + e.into(), + ), + ), + )) + .await; + Online::default() + } + }; + let (send, recv) = oneshot::channel(); + CommandMessage::SetStatus(online.clone(), send) + .handle_online( + writer.clone(), + supervisor.sender(), + self.jid.clone(), + self.db.clone(), + self.sender.clone(), + self.pending_iqs.clone(), + ) + .await; + let set_status = recv.await; + match set_status { + Ok(s) => match s { + Ok(()) => { + *connection_lock = + Some((writer, supervisor)); + let _ = self + .sender + .send(UpdateMessage::Online( + online, roster, + )) + .await; + continue; + } + Err(e) => { + let _ = self + .sender + .send(UpdateMessage::Error( + Error::Connection(e.into()), + )) + .await; + } + }, + Err(e) => { + let _ = self.sender.send(UpdateMessage::Error(Error::Connection(ConnectionError::SendPresence(e.into())))).await; + } + } + } + Err(e) => { + let _ = self + .sender + .send(UpdateMessage::Error( + Error::Connection(e.into()), + )) + .await; + } + } + } + Err(e) => { + let _ = self + .sender + .send(UpdateMessage::Error(Error::Connection( + ConnectionError::RosterRetreival(e.into()), + ))) + .await; + } + } } Err(e) => { - self.sender.send(UpdateMessage::Error(e.into())); + let _ = + self.sender.send(UpdateMessage::Error(Error::Connection( + ConnectionError::ConnectionFailed(e.into()), + ))); } } } }; } - CommandMessage::Disconnect => match self.connected.lock().await.as_mut() { - None => { - self.sender - .send(UpdateMessage::Error(Error::AlreadyDisconnected)) - .await; - } - mut c => { - if let Some((_write_handle, supervisor_handle)) = c.take() { - let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; - } else { - unreachable!() - }; + CommandMessage::Disconnect(_offline) => { + match self.connected.lock().await.as_mut() { + None => { + let _ = self + .sender + .send(UpdateMessage::Error(Error::AlreadyDisconnected)) + .await; + } + mut c => { + // TODO: send unavailable presence + if let Some((_write_handle, supervisor_handle)) = c.take() { + let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; + } else { + unreachable!() + }; + } } - }, + } _ => { match self.connected.lock().await.as_ref() { Some((w, s)) => self.tasks.spawn(msg.handle_online( @@ -168,9 +270,41 @@ impl CommandMessage { mut self, jid: Arc<Mutex<JID>>, db: Db, - sender: mpsc::Sender<UpdateMessage>, + update_sender: mpsc::Sender<UpdateMessage>, ) { - todo!() + match self { + CommandMessage::Connect => unreachable!(), + CommandMessage::Disconnect(offline) => unreachable!(), + CommandMessage::GetRoster(sender) => { + let roster = db.read_cached_roster().await; + match roster { + Ok(roster) => { + let _ = sender.send(Ok(roster)); + } + Err(e) => { + let _ = sender.send(Err(RosterError(e.into()))); + } + } + } + CommandMessage::GetChats(sender) => todo!(), + CommandMessage::GetChat(jid, sender) => todo!(), + CommandMessage::GetMessages(jid, sender) => todo!(), + CommandMessage::DeleteChat(jid, sender) => todo!(), + CommandMessage::DeleteMessage(uuid, sender) => todo!(), + CommandMessage::GetUser(jid, sender) => todo!(), + CommandMessage::AddContact(jid, sender) => todo!(), + CommandMessage::BuddyRequest(jid, sender) => todo!(), + CommandMessage::SubscriptionRequest(jid, sender) => todo!(), + CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(), + CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(), + CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(), + CommandMessage::UnsubscribeContact(jid, sender) => todo!(), + CommandMessage::UnfriendContact(jid, sender) => todo!(), + CommandMessage::DeleteContact(jid, sender) => todo!(), + CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(), + CommandMessage::SetStatus(online, sender) => todo!(), + CommandMessage::SendMessage(jid, body, sender) => todo!(), + } } pub async fn handle_online( @@ -181,20 +315,25 @@ impl CommandMessage { jid: Arc<Mutex<JID>>, db: Db, sender: mpsc::Sender<UpdateMessage>, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, ) { match self { CommandMessage::Connect => unreachable!(), - CommandMessage::Disconnect => unreachable!(), - CommandMessage::GetRoster => { + CommandMessage::Disconnect(_) => unreachable!(), + CommandMessage::GetRoster(result_sender) => { // TODO: jid resource should probably be stored within the connection let owned_jid: JID; { owned_jid = jid.lock().await.clone(); } + let iq_id = Uuid::new_v4().to_string(); + let (send, iq_recv) = oneshot::channel(); + { + pending_iqs.lock().await.insert(iq_id.clone(), send); + } let stanza = Stanza::Iq(Iq { from: Some(owned_jid), - id: "getting-roster".to_string(), + id: iq_id.to_string(), to: None, r#type: IqType::Get, lang: None, @@ -211,13 +350,69 @@ impl CommandMessage { respond_to: send, }) .await; + // TODO: timeout match recv.await { - Ok(Ok(())) => println!("roster request sent"), - e => println!("error: {:?}", e), + Ok(Ok(())) => info!("roster request sent"), + Ok(Err(e)) => { + // TODO: log errors if fail to send + let _ = result_sender.send(Err(RosterError(e.into()))); + return; + } + Err(e) => { + let _ = result_sender.send(Err(RosterError(e.into()))); + return; + } }; + // TODO: timeout + match iq_recv.await { + Ok(Ok(stanza)) => match stanza { + Stanza::Iq(Iq { + from, + id, + to, + r#type, + lang, + query: Some(iq::Query::Roster(stanza::roster::Query { ver, items })), + errors, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = + items.into_iter().map(|item| item.into()).collect(); + result_sender.send(Ok(contacts)); + return; + } + s => { + result_sender.send(Err(RosterError(Reason::UnexpectedStanza(s)))); + return; + } + }, + Ok(Err(e)) => { + result_sender.send(Err(RosterError(e.into()))); + return; + } + Err(e) => { + result_sender.send(Err(RosterError(e.into()))); + return; + } + } } - CommandMessage::SendMessage { id, to, body } => todo!(), - _ => todo!(), + CommandMessage::GetChats(sender) => todo!(), + CommandMessage::GetChat(jid, sender) => todo!(), + CommandMessage::GetMessages(jid, sender) => todo!(), + CommandMessage::DeleteChat(jid, sender) => todo!(), + CommandMessage::DeleteMessage(uuid, sender) => todo!(), + CommandMessage::GetUser(jid, sender) => todo!(), + CommandMessage::AddContact(jid, sender) => todo!(), + CommandMessage::BuddyRequest(jid, sender) => todo!(), + CommandMessage::SubscriptionRequest(jid, sender) => todo!(), + CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(), + CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(), + CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(), + CommandMessage::UnsubscribeContact(jid, sender) => todo!(), + CommandMessage::UnfriendContact(jid, sender) => todo!(), + CommandMessage::DeleteContact(jid, sender) => todo!(), + CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(), + CommandMessage::SetStatus(online, sender) => todo!(), + CommandMessage::SendMessage(jid, body, sender) => todo!(), } } } @@ -254,6 +449,7 @@ impl LuzHandle { let (sup_send, sup_recv) = oneshot::channel(); let actor = Luz::new( + command_sender.clone(), command_receiver, Arc::new(Mutex::new(jid)), password, @@ -280,10 +476,12 @@ pub enum CommandMessage { /// disconnect from XMPP chat server, sending unavailable presence then closing stream. Disconnect(Offline), /// get the roster. if offline, retreive cached version from database. should be stored in application memory - GetRoster(oneshot::Sender<Result<Vec<Contact>, Error>>), + GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>), /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews) // TODO: paging and filtering GetChats(oneshot::Sender<Result<Vec<Chat>, Error>>), + /// get a specific chat by jid + GetChat(JID, oneshot::Sender<Result<Chat, Error>>), /// get message history for chat (does appropriate mam things) // TODO: paging and filtering GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Error>>), @@ -315,7 +513,7 @@ pub enum CommandMessage { /// update contact UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Error>>), /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence. - SetStatusMessage(Option<String>, oneshot::Sender<Result<(), Error>>), + SetStatus(Online, oneshot::Sender<Result<(), StatusError>>), /// send a directed presence (usually to a non-contact). // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting) /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a @@ -326,9 +524,10 @@ pub enum CommandMessage { #[derive(Debug)] pub enum UpdateMessage { Error(Error), - Online(Online), + Online(Online, Vec<Contact>), Offline(Offline), /// received roster from jabber server (replace full app roster state with this) + /// is this needed? FullRoster(Vec<Contact>), /// (only update app roster state, don't replace) RosterUpdate(Contact), diff --git a/luz/src/main.rs b/luz/src/main.rs index 7b3815f..5e9cd13 100644 --- a/luz/src/main.rs +++ b/luz/src/main.rs @@ -18,6 +18,5 @@ async fn main() { }); luz.send(CommandMessage::Connect).await.unwrap(); - luz.send(CommandMessage::GetRoster).await.unwrap(); tokio::time::sleep(Duration::from_secs(15)).await; } diff --git a/luz/src/presence.rs b/luz/src/presence.rs index b7ebe1d..fac1bb4 100644 --- a/luz/src/presence.rs +++ b/luz/src/presence.rs @@ -1,12 +1,57 @@ -use stanza::client::presence::Show; +use sqlx::Sqlite; -#[derive(Debug, Default)] +#[derive(Debug, Default, sqlx::FromRow, Clone)] pub struct Online { - show: Option<Show>, - status: Option<String>, + pub show: Option<Show>, + pub status: Option<String>, + #[sqlx(skip)] priority: Option<i8>, } +#[derive(Debug, Clone, Copy)] +pub enum Show { + Away, + Chat, + DoNotDisturb, + ExtendedAway, +} + +impl sqlx::Type<Sqlite> for Show { + fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo { + <&str as sqlx::Type<Sqlite>>::type_info() + } +} + +impl sqlx::Decode<'_, Sqlite> for Show { + fn decode( + value: <Sqlite as sqlx::Database>::ValueRef<'_>, + ) -> Result<Self, sqlx::error::BoxDynError> { + let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?; + match value { + "away" => Ok(Self::Away), + "chat" => Ok(Self::Chat), + "do-not-disturb" => Ok(Self::DoNotDisturb), + "extended-away" => Ok(Self::ExtendedAway), + _ => unreachable!(), + } + } +} + +impl sqlx::Encode<'_, Sqlite> for Show { + fn encode_by_ref( + &self, + buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>, + ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> { + let value = match self { + Show::Away => "away", + Show::Chat => "chat", + Show::DoNotDisturb => "do-not-disturb", + Show::ExtendedAway => "extended-away", + }; + <&str as sqlx::Encode<Sqlite>>::encode(value, buf) + } +} + #[derive(Debug, Default)] pub struct Offline { status: Option<String>, diff --git a/luz/src/roster.rs b/luz/src/roster.rs index 2f50eb6..2e3de7e 100644 --- a/luz/src/roster.rs +++ b/luz/src/roster.rs @@ -24,7 +24,7 @@ pub struct Contact { } #[derive(Debug)] -enum Subscription { +pub enum Subscription { None, PendingOut, PendingIn, @@ -80,3 +80,46 @@ impl sqlx::Encode<'_, Sqlite> for Subscription { <&str as sqlx::Encode<Sqlite>>::encode(value, buf) } } + +// none +// > +// >> +// < +// << +// >< +// >>< +// ><< +// >><< + +impl From<stanza::roster::Item> for Contact { + fn from(value: stanza::roster::Item) -> Self { + let subscription = match value.ask { + true => match value.subscription { + Some(s) => match s { + stanza::roster::Subscription::Both => Subscription::Buddy, + stanza::roster::Subscription::From => Subscription::InPendingOut, + stanza::roster::Subscription::None => Subscription::PendingOut, + stanza::roster::Subscription::Remove => Subscription::PendingOut, + stanza::roster::Subscription::To => Subscription::OnlyOut, + }, + None => Subscription::PendingOut, + }, + false => match value.subscription { + Some(s) => match s { + stanza::roster::Subscription::Both => Subscription::Buddy, + stanza::roster::Subscription::From => Subscription::OnlyIn, + stanza::roster::Subscription::None => Subscription::None, + stanza::roster::Subscription::Remove => Subscription::None, + stanza::roster::Subscription::To => Subscription::OnlyOut, + }, + None => Subscription::None, + }, + }; + Contact { + user_jid: value.jid, + subscription, + name: value.name, + groups: HashSet::from_iter(value.groups.into_iter().filter_map(|group| group.0)), + } + } +} diff --git a/stanza/src/roster.rs b/stanza/src/roster.rs index ec83403..0181193 100644 --- a/stanza/src/roster.rs +++ b/stanza/src/roster.rs @@ -37,16 +37,16 @@ impl IntoElement for Query { #[derive(Clone, Debug)] pub struct Item { /// signals subscription pre-approval (server only) - approved: Option<bool>, + pub approved: Option<bool>, /// signals subscription sub-states (server only) - ask: bool, + pub ask: bool, /// uniquely identifies item - jid: JID, + pub jid: JID, /// handle that is determined by user, not contact - name: Option<String>, + pub name: Option<String>, /// state of the presence subscription - subscription: Option<Subscription>, - groups: Vec<Group>, + pub subscription: Option<Subscription>, + pub groups: Vec<Group>, } impl FromElement for Item { @@ -140,7 +140,7 @@ impl ToString for Subscription { #[derive(Clone, Debug)] // TODO: check if should be option or not -pub struct Group(Option<String>); +pub struct Group(pub Option<String>); impl FromElement for Group { fn from_element(mut element: peanuts::Element) -> peanuts::element::DeserializeResult<Self> { |