aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@blos.sm>2025-02-18 06:14:43 +0000
committerLibravatar cel 🌸 <cel@blos.sm>2025-02-18 06:14:43 +0000
commit5dd488550f9959914d16bde9269284ebd043e0e6 (patch)
tree3201c93bb0a457526f62139501e697287b2db9a1
parent68a7d136705133dc5d3a5d43b9ff4da28eeb6d5b (diff)
downloadluz-5dd488550f9959914d16bde9269284ebd043e0e6.tar.gz
luz-5dd488550f9959914d16bde9269284ebd043e0e6.tar.bz2
luz-5dd488550f9959914d16bde9269284ebd043e0e6.zip
WIP: roster retrieval
-rw-r--r--luz/migrations/20240113011930_luz.sql19
-rw-r--r--luz/src/connection/mod.rs33
-rw-r--r--luz/src/connection/read.rs26
-rw-r--r--luz/src/connection/write.rs8
-rw-r--r--luz/src/db/mod.rs26
-rw-r--r--luz/src/error.rs57
-rw-r--r--luz/src/lib.rs259
-rw-r--r--luz/src/main.rs1
-rw-r--r--luz/src/presence.rs53
-rw-r--r--luz/src/roster.rs45
-rw-r--r--stanza/src/roster.rs14
11 files changed, 466 insertions, 75 deletions
diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql
index 3b18208..082cc4b 100644
--- a/luz/migrations/20240113011930_luz.sql
+++ b/luz/migrations/20240113011930_luz.sql
@@ -6,6 +6,7 @@ create table users(
jid text primary key not null,
-- can receive presence status from non-contacts
cached_status_message text
+ -- TODO: last_seen
);
-- enum for subscription state
@@ -55,6 +56,8 @@ create table messages (
-- check ((chat_id == null) <> (channel_id == null)),
-- check ((chat_id == null) or (channel_id == null)),
-- user is the current "owner" of the message
+ -- TODO: queued messages offline
+ -- TODO: timestamp
-- TODO: icky
-- the user to show it coming from (not necessarily the original sender)
@@ -68,3 +71,19 @@ create table messages (
foreign key(from_jid) references users(jid),
foreign key(originally_from) references users(jid)
);
+
+-- enum for subscription state
+create table show (
+ state text primary key not null
+);
+
+insert into show ( state ) values ('away'), ('chat'), ('do-not-disturb'), ('extended-away');
+
+create table cached_status (
+ id integer primary key not null,
+ show text,
+ message text,
+ foreign key(show) references show(state)
+);
+
+insert into cached_status (id) values (0);
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs
index 8ff433b..fda2b90 100644
--- a/luz/src/connection/mod.rs
+++ b/luz/src/connection/mod.rs
@@ -17,7 +17,11 @@ use tokio::{
};
use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
-use crate::{db::Db, error::Error, UpdateMessage};
+use crate::{
+ db::Db,
+ error::{Error, Reason},
+ UpdateMessage,
+};
mod read;
pub(crate) mod write;
@@ -31,7 +35,7 @@ pub struct Supervisor {
tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
@@ -57,7 +61,7 @@ pub enum State {
tokio::task::JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
),
),
}
@@ -72,7 +76,7 @@ impl Supervisor {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
@@ -172,9 +176,10 @@ impl Supervisor {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_state.close();
while let Some(msg) = write_state.recv().await {
- let _ = msg.respond_to.send(Err(Error::LostConnection));
+ let _ = msg.respond_to.send(Err(Reason::LostConnection));
}
- let _ = self.sender.send(UpdateMessage::Error(e.into())).await;
+ // TODO: is this the correct error?
+ let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break;
},
}
@@ -218,11 +223,12 @@ impl Supervisor {
Err(e) => {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
write_recv.close();
- let _ = write_msg.respond_to.send(Err(Error::LostConnection));
+ let _ = write_msg.respond_to.send(Err(Reason::LostConnection));
while let Some(msg) = write_recv.recv().await {
- let _ = msg.respond_to.send(Err(Error::LostConnection));
+ let _ = msg.respond_to.send(Err(Reason::LostConnection));
}
- let _ = self.sender.send(UpdateMessage::Error(e.into())).await;
+ // TODO: is this the correct error to send?
+ let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break;
},
}
@@ -268,12 +274,13 @@ impl Supervisor {
// if reconnection failure, respond to all current messages with lost connection error.
write_receiver.close();
if let Some(msg) = retry_msg {
- msg.respond_to.send(Err(Error::LostConnection));
+ msg.respond_to.send(Err(Reason::LostConnection));
}
while let Some(msg) = write_receiver.recv().await {
- msg.respond_to.send(Err(Error::LostConnection));
+ msg.respond_to.send(Err(Reason::LostConnection));
}
- let _ = self.sender.send(UpdateMessage::Error(e.into())).await;
+ // TODO: is this the correct error?
+ let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break;
},
}
@@ -331,7 +338,7 @@ impl SupervisorHandle {
on_shutdown: oneshot::Sender<()>,
jid: Arc<Mutex<JID>>,
password: Arc<String>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) -> (WriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index 692952b..8f8c4a0 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -13,7 +13,11 @@ use tokio::{
};
use tracing::info;
-use crate::{db::Db, error::Error, UpdateMessage};
+use crate::{
+ db::Db,
+ error::{Error, Reason},
+ UpdateMessage,
+};
use super::{
write::{WriteHandle, WriteMessage},
@@ -29,7 +33,7 @@ pub struct Read {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -39,7 +43,7 @@ pub struct Read {
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
// TODO: use proper stanza ids
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
}
impl Read {
@@ -52,7 +56,7 @@ impl Read {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>,
db: Db,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -60,7 +64,7 @@ impl Read {
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
tasks: JoinSet<()>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) -> Self {
let (send, recv) = oneshot::channel();
Self {
@@ -153,7 +157,7 @@ impl Read {
// when it aborts, must clear iq map no matter what
let mut iqs = self.pending_iqs.lock().await;
for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(Error::LostConnection));
+ let _ = sender.send(Err(Reason::LostConnection));
}
}
}
@@ -182,7 +186,7 @@ pub enum ReadControl {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>,
),
}
@@ -215,13 +219,13 @@ impl ReadControlHandle {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@@ -252,14 +256,14 @@ impl ReadControlHandle {
JoinSet<()>,
mpsc::Sender<SupervisorCommand>,
WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
)>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
tasks: JoinSet<()>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs
index 18dba5c..70584a2 100644
--- a/luz/src/connection/write.rs
+++ b/luz/src/connection/write.rs
@@ -7,7 +7,7 @@ use tokio::{
task::JoinHandle,
};
-use crate::error::Error;
+use crate::error::{Error, Reason};
// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct Write {
@@ -19,7 +19,7 @@ pub struct Write {
pub struct WriteMessage {
pub stanza: Stanza,
- pub respond_to: oneshot::Sender<Result<(), Error>>,
+ pub respond_to: oneshot::Sender<Result<(), Reason>>,
}
pub enum WriteControl {
@@ -84,9 +84,9 @@ impl Write {
Err(e) => match &e {
peanuts::Error::ReadError(_error) => {
// if connection lost during disconnection, just send lost connection error to the write requests
- let _ = msg.respond_to.send(Err(Error::LostConnection));
+ let _ = msg.respond_to.send(Err(Reason::LostConnection));
while let Some(msg) = self.stanza_receiver.recv().await {
- let _ = msg.respond_to.send(Err(Error::LostConnection));
+ let _ = msg.respond_to.send(Err(Reason::LostConnection));
}
break;
}
diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs
index f598fbb..7557f70 100644
--- a/luz/src/db/mod.rs
+++ b/luz/src/db/mod.rs
@@ -6,6 +6,7 @@ use uuid::Uuid;
use crate::{
chat::{Chat, Message},
+ presence::Online,
roster::Contact,
user::User,
};
@@ -315,4 +316,29 @@ impl Db {
.await?;
Ok(messages)
}
+
+ pub async fn read_cached_status(&self) -> Result<Online, Error> {
+ let online: Online = sqlx::query_as("select * from cached_status where id = 0")
+ .fetch_one(&self.db)
+ .await?;
+ Ok(online)
+ }
+
+ pub async fn upsert_cached_status(&self, status: Online) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into cached_status (id, show, message) values (0, ?, ?) on conflict do update set show = ?, message = ?",
+ status.show,
+ status.status,
+ status.show,
+ status.status
+ ).execute(&self.db).await?;
+ Ok(())
+ }
+
+ pub async fn delete_cached_status(&self) -> Result<(), Error> {
+ sqlx::query!("update cached_status set show = null, message = null where id = 0")
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
}
diff --git a/luz/src/error.rs b/luz/src/error.rs
index 16e1c6e..b9a6487 100644
--- a/luz/src/error.rs
+++ b/luz/src/error.rs
@@ -1,6 +1,11 @@
+use stanza::client::Stanza;
+use tokio::sync::oneshot::{self};
+
#[derive(Debug)]
pub enum Error {
AlreadyConnected,
+ // TODO: change to Connecting(ConnectingError)
+ Connection(ConnectionError),
Presence(Reason),
Roster(Reason),
SendMessage(Reason),
@@ -9,6 +14,42 @@ pub enum Error {
}
#[derive(Debug)]
+pub enum ConnectionError {
+ ConnectionFailed(Reason),
+ RosterRetreival(Reason),
+ SendPresence(Reason),
+ NoCachedStatus(Reason),
+}
+
+pub struct RosterError(pub Reason);
+
+impl From<RosterError> for Error {
+ fn from(e: RosterError) -> Self {
+ Self::Roster(e.0)
+ }
+}
+
+impl From<RosterError> for ConnectionError {
+ fn from(e: RosterError) -> Self {
+ Self::RosterRetreival(e.0)
+ }
+}
+
+pub struct StatusError(Reason);
+
+impl From<StatusError> for Error {
+ fn from(e: StatusError) -> Self {
+ Error::Presence(e.0)
+ }
+}
+
+impl From<StatusError> for ConnectionError {
+ fn from(e: StatusError) -> Self {
+ Self::SendPresence(e.0)
+ }
+}
+
+#[derive(Debug)]
pub enum Reason {
// TODO: organisastion of error into internal error thing
Timeout,
@@ -19,27 +60,35 @@ pub enum Reason {
SQL(sqlx::Error),
// JID(jid::ParseError),
LostConnection,
+ OneshotRecv(oneshot::error::RecvError),
+ UnexpectedStanza(Stanza),
+}
+
+impl From<oneshot::error::RecvError> for Reason {
+ fn from(e: oneshot::error::RecvError) -> Reason {
+ Self::OneshotRecv(e)
+ }
}
-impl From<peanuts::Error> for Error {
+impl From<peanuts::Error> for Reason {
fn from(e: peanuts::Error) -> Self {
Self::XML(e)
}
}
-// impl From<jid::ParseError> for Error {
+// impl From<jid::ParseError> for Reason {
// fn from(e: jid::ParseError) -> Self {
// Self::JID(e)
// }
// }
-impl From<sqlx::Error> for Error {
+impl From<sqlx::Error> for Reason {
fn from(e: sqlx::Error) -> Self {
Self::SQL(e)
}
}
-impl From<jabber::Error> for Error {
+impl From<jabber::Error> for Reason {
fn from(e: jabber::Error) -> Self {
Self::Jabber(e)
}
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 4d59e61..c14bae6 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -7,6 +7,7 @@ use std::{
use chat::{Body, Chat, Message};
use connection::{write::WriteMessage, SupervisorSender};
use db::Db;
+use error::{ConnectionError, Reason, RosterError, StatusError};
use jabber::JID;
use presence::{Offline, Online, Presence};
use roster::{Contact, ContactUpdate};
@@ -19,6 +20,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
};
+use tracing::info;
use user::User;
use uuid::Uuid;
@@ -35,12 +37,13 @@ mod roster;
mod user;
pub struct Luz {
+ command_sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: Arc<String>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
@@ -52,6 +55,7 @@ pub struct Luz {
impl Luz {
fn new(
+ command_sender: mpsc::Sender<CommandMessage>,
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
password: String,
@@ -70,6 +74,7 @@ impl Luz {
tasks: JoinSet::new(),
connection_supervisor_shutdown,
pending_iqs: Arc::new(Mutex::new(HashMap::new())),
+ command_sender,
}
}
@@ -117,30 +122,127 @@ impl Luz {
self.pending_iqs.clone(),
);
self.connection_supervisor_shutdown = shutdown_recv;
- *connection_lock = Some((writer, supervisor));
- self.sender.send(UpdateMessage::Connected(todo!())).await;
+ // TODO: get roster and send initial presence
+ let (send, recv) = oneshot::channel();
+ CommandMessage::GetRoster(send)
+ .handle_online(
+ writer.clone(),
+ supervisor.sender(),
+ self.jid.clone(),
+ self.db.clone(),
+ self.sender.clone(),
+ self.pending_iqs.clone(),
+ )
+ .await;
+ let roster = recv.await;
+ match roster {
+ Ok(r) => {
+ match r {
+ Ok(roster) => {
+ let online = self.db.read_cached_status().await;
+ let online = match online {
+ Ok(online) => online,
+ Err(e) => {
+ let _ = self
+ .sender
+ .send(UpdateMessage::Error(
+ Error::Connection(
+ ConnectionError::NoCachedStatus(
+ e.into(),
+ ),
+ ),
+ ))
+ .await;
+ Online::default()
+ }
+ };
+ let (send, recv) = oneshot::channel();
+ CommandMessage::SetStatus(online.clone(), send)
+ .handle_online(
+ writer.clone(),
+ supervisor.sender(),
+ self.jid.clone(),
+ self.db.clone(),
+ self.sender.clone(),
+ self.pending_iqs.clone(),
+ )
+ .await;
+ let set_status = recv.await;
+ match set_status {
+ Ok(s) => match s {
+ Ok(()) => {
+ *connection_lock =
+ Some((writer, supervisor));
+ let _ = self
+ .sender
+ .send(UpdateMessage::Online(
+ online, roster,
+ ))
+ .await;
+ continue;
+ }
+ Err(e) => {
+ let _ = self
+ .sender
+ .send(UpdateMessage::Error(
+ Error::Connection(e.into()),
+ ))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = self.sender.send(UpdateMessage::Error(Error::Connection(ConnectionError::SendPresence(e.into())))).await;
+ }
+ }
+ }
+ Err(e) => {
+ let _ = self
+ .sender
+ .send(UpdateMessage::Error(
+ Error::Connection(e.into()),
+ ))
+ .await;
+ }
+ }
+ }
+ Err(e) => {
+ let _ = self
+ .sender
+ .send(UpdateMessage::Error(Error::Connection(
+ ConnectionError::RosterRetreival(e.into()),
+ )))
+ .await;
+ }
+ }
}
Err(e) => {
- self.sender.send(UpdateMessage::Error(e.into()));
+ let _ =
+ self.sender.send(UpdateMessage::Error(Error::Connection(
+ ConnectionError::ConnectionFailed(e.into()),
+ )));
}
}
}
};
}
- CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
- None => {
- self.sender
- .send(UpdateMessage::Error(Error::AlreadyDisconnected))
- .await;
- }
- mut c => {
- if let Some((_write_handle, supervisor_handle)) = c.take() {
- let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
- } else {
- unreachable!()
- };
+ CommandMessage::Disconnect(_offline) => {
+ match self.connected.lock().await.as_mut() {
+ None => {
+ let _ = self
+ .sender
+ .send(UpdateMessage::Error(Error::AlreadyDisconnected))
+ .await;
+ }
+ mut c => {
+ // TODO: send unavailable presence
+ if let Some((_write_handle, supervisor_handle)) = c.take() {
+ let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
+ } else {
+ unreachable!()
+ };
+ }
}
- },
+ }
_ => {
match self.connected.lock().await.as_ref() {
Some((w, s)) => self.tasks.spawn(msg.handle_online(
@@ -168,9 +270,41 @@ impl CommandMessage {
mut self,
jid: Arc<Mutex<JID>>,
db: Db,
- sender: mpsc::Sender<UpdateMessage>,
+ update_sender: mpsc::Sender<UpdateMessage>,
) {
- todo!()
+ match self {
+ CommandMessage::Connect => unreachable!(),
+ CommandMessage::Disconnect(offline) => unreachable!(),
+ CommandMessage::GetRoster(sender) => {
+ let roster = db.read_cached_roster().await;
+ match roster {
+ Ok(roster) => {
+ let _ = sender.send(Ok(roster));
+ }
+ Err(e) => {
+ let _ = sender.send(Err(RosterError(e.into())));
+ }
+ }
+ }
+ CommandMessage::GetChats(sender) => todo!(),
+ CommandMessage::GetChat(jid, sender) => todo!(),
+ CommandMessage::GetMessages(jid, sender) => todo!(),
+ CommandMessage::DeleteChat(jid, sender) => todo!(),
+ CommandMessage::DeleteMessage(uuid, sender) => todo!(),
+ CommandMessage::GetUser(jid, sender) => todo!(),
+ CommandMessage::AddContact(jid, sender) => todo!(),
+ CommandMessage::BuddyRequest(jid, sender) => todo!(),
+ CommandMessage::SubscriptionRequest(jid, sender) => todo!(),
+ CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(),
+ CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(),
+ CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(),
+ CommandMessage::UnsubscribeContact(jid, sender) => todo!(),
+ CommandMessage::UnfriendContact(jid, sender) => todo!(),
+ CommandMessage::DeleteContact(jid, sender) => todo!(),
+ CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(),
+ CommandMessage::SetStatus(online, sender) => todo!(),
+ CommandMessage::SendMessage(jid, body, sender) => todo!(),
+ }
}
pub async fn handle_online(
@@ -181,20 +315,25 @@ impl CommandMessage {
jid: Arc<Mutex<JID>>,
db: Db,
sender: mpsc::Sender<UpdateMessage>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) {
match self {
CommandMessage::Connect => unreachable!(),
- CommandMessage::Disconnect => unreachable!(),
- CommandMessage::GetRoster => {
+ CommandMessage::Disconnect(_) => unreachable!(),
+ CommandMessage::GetRoster(result_sender) => {
// TODO: jid resource should probably be stored within the connection
let owned_jid: JID;
{
owned_jid = jid.lock().await.clone();
}
+ let iq_id = Uuid::new_v4().to_string();
+ let (send, iq_recv) = oneshot::channel();
+ {
+ pending_iqs.lock().await.insert(iq_id.clone(), send);
+ }
let stanza = Stanza::Iq(Iq {
from: Some(owned_jid),
- id: "getting-roster".to_string(),
+ id: iq_id.to_string(),
to: None,
r#type: IqType::Get,
lang: None,
@@ -211,13 +350,69 @@ impl CommandMessage {
respond_to: send,
})
.await;
+ // TODO: timeout
match recv.await {
- Ok(Ok(())) => println!("roster request sent"),
- e => println!("error: {:?}", e),
+ Ok(Ok(())) => info!("roster request sent"),
+ Ok(Err(e)) => {
+ // TODO: log errors if fail to send
+ let _ = result_sender.send(Err(RosterError(e.into())));
+ return;
+ }
+ Err(e) => {
+ let _ = result_sender.send(Err(RosterError(e.into())));
+ return;
+ }
};
+ // TODO: timeout
+ match iq_recv.await {
+ Ok(Ok(stanza)) => match stanza {
+ Stanza::Iq(Iq {
+ from,
+ id,
+ to,
+ r#type,
+ lang,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver, items })),
+ errors,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> =
+ items.into_iter().map(|item| item.into()).collect();
+ result_sender.send(Ok(contacts));
+ return;
+ }
+ s => {
+ result_sender.send(Err(RosterError(Reason::UnexpectedStanza(s))));
+ return;
+ }
+ },
+ Ok(Err(e)) => {
+ result_sender.send(Err(RosterError(e.into())));
+ return;
+ }
+ Err(e) => {
+ result_sender.send(Err(RosterError(e.into())));
+ return;
+ }
+ }
}
- CommandMessage::SendMessage { id, to, body } => todo!(),
- _ => todo!(),
+ CommandMessage::GetChats(sender) => todo!(),
+ CommandMessage::GetChat(jid, sender) => todo!(),
+ CommandMessage::GetMessages(jid, sender) => todo!(),
+ CommandMessage::DeleteChat(jid, sender) => todo!(),
+ CommandMessage::DeleteMessage(uuid, sender) => todo!(),
+ CommandMessage::GetUser(jid, sender) => todo!(),
+ CommandMessage::AddContact(jid, sender) => todo!(),
+ CommandMessage::BuddyRequest(jid, sender) => todo!(),
+ CommandMessage::SubscriptionRequest(jid, sender) => todo!(),
+ CommandMessage::AcceptBuddyRequest(jid, sender) => todo!(),
+ CommandMessage::AcceptSubscriptionRequest(jid, sender) => todo!(),
+ CommandMessage::UnsubscribeFromContact(jid, sender) => todo!(),
+ CommandMessage::UnsubscribeContact(jid, sender) => todo!(),
+ CommandMessage::UnfriendContact(jid, sender) => todo!(),
+ CommandMessage::DeleteContact(jid, sender) => todo!(),
+ CommandMessage::UpdateContact(jid, contact_update, sender) => todo!(),
+ CommandMessage::SetStatus(online, sender) => todo!(),
+ CommandMessage::SendMessage(jid, body, sender) => todo!(),
}
}
}
@@ -254,6 +449,7 @@ impl LuzHandle {
let (sup_send, sup_recv) = oneshot::channel();
let actor = Luz::new(
+ command_sender.clone(),
command_receiver,
Arc::new(Mutex::new(jid)),
password,
@@ -280,10 +476,12 @@ pub enum CommandMessage {
/// disconnect from XMPP chat server, sending unavailable presence then closing stream.
Disconnect(Offline),
/// get the roster. if offline, retreive cached version from database. should be stored in application memory
- GetRoster(oneshot::Sender<Result<Vec<Contact>, Error>>),
+ GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, Error>>),
+ /// get a specific chat by jid
+ GetChat(JID, oneshot::Sender<Result<Chat, Error>>),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, Error>>),
@@ -315,7 +513,7 @@ pub enum CommandMessage {
/// update contact
UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), Error>>),
/// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
- SetStatusMessage(Option<String>, oneshot::Sender<Result<(), Error>>),
+ SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
/// send a directed presence (usually to a non-contact).
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
@@ -326,9 +524,10 @@ pub enum CommandMessage {
#[derive(Debug)]
pub enum UpdateMessage {
Error(Error),
- Online(Online),
+ Online(Online, Vec<Contact>),
Offline(Offline),
/// received roster from jabber server (replace full app roster state with this)
+ /// is this needed?
FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace)
RosterUpdate(Contact),
diff --git a/luz/src/main.rs b/luz/src/main.rs
index 7b3815f..5e9cd13 100644
--- a/luz/src/main.rs
+++ b/luz/src/main.rs
@@ -18,6 +18,5 @@ async fn main() {
});
luz.send(CommandMessage::Connect).await.unwrap();
- luz.send(CommandMessage::GetRoster).await.unwrap();
tokio::time::sleep(Duration::from_secs(15)).await;
}
diff --git a/luz/src/presence.rs b/luz/src/presence.rs
index b7ebe1d..fac1bb4 100644
--- a/luz/src/presence.rs
+++ b/luz/src/presence.rs
@@ -1,12 +1,57 @@
-use stanza::client::presence::Show;
+use sqlx::Sqlite;
-#[derive(Debug, Default)]
+#[derive(Debug, Default, sqlx::FromRow, Clone)]
pub struct Online {
- show: Option<Show>,
- status: Option<String>,
+ pub show: Option<Show>,
+ pub status: Option<String>,
+ #[sqlx(skip)]
priority: Option<i8>,
}
+#[derive(Debug, Clone, Copy)]
+pub enum Show {
+ Away,
+ Chat,
+ DoNotDisturb,
+ ExtendedAway,
+}
+
+impl sqlx::Type<Sqlite> for Show {
+ fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
+ <&str as sqlx::Type<Sqlite>>::type_info()
+ }
+}
+
+impl sqlx::Decode<'_, Sqlite> for Show {
+ fn decode(
+ value: <Sqlite as sqlx::Database>::ValueRef<'_>,
+ ) -> Result<Self, sqlx::error::BoxDynError> {
+ let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
+ match value {
+ "away" => Ok(Self::Away),
+ "chat" => Ok(Self::Chat),
+ "do-not-disturb" => Ok(Self::DoNotDisturb),
+ "extended-away" => Ok(Self::ExtendedAway),
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl sqlx::Encode<'_, Sqlite> for Show {
+ fn encode_by_ref(
+ &self,
+ buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
+ ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
+ let value = match self {
+ Show::Away => "away",
+ Show::Chat => "chat",
+ Show::DoNotDisturb => "do-not-disturb",
+ Show::ExtendedAway => "extended-away",
+ };
+ <&str as sqlx::Encode<Sqlite>>::encode(value, buf)
+ }
+}
+
#[derive(Debug, Default)]
pub struct Offline {
status: Option<String>,
diff --git a/luz/src/roster.rs b/luz/src/roster.rs
index 2f50eb6..2e3de7e 100644
--- a/luz/src/roster.rs
+++ b/luz/src/roster.rs
@@ -24,7 +24,7 @@ pub struct Contact {
}
#[derive(Debug)]
-enum Subscription {
+pub enum Subscription {
None,
PendingOut,
PendingIn,
@@ -80,3 +80,46 @@ impl sqlx::Encode<'_, Sqlite> for Subscription {
<&str as sqlx::Encode<Sqlite>>::encode(value, buf)
}
}
+
+// none
+// >
+// >>
+// <
+// <<
+// ><
+// >><
+// ><<
+// >><<
+
+impl From<stanza::roster::Item> for Contact {
+ fn from(value: stanza::roster::Item) -> Self {
+ let subscription = match value.ask {
+ true => match value.subscription {
+ Some(s) => match s {
+ stanza::roster::Subscription::Both => Subscription::Buddy,
+ stanza::roster::Subscription::From => Subscription::InPendingOut,
+ stanza::roster::Subscription::None => Subscription::PendingOut,
+ stanza::roster::Subscription::Remove => Subscription::PendingOut,
+ stanza::roster::Subscription::To => Subscription::OnlyOut,
+ },
+ None => Subscription::PendingOut,
+ },
+ false => match value.subscription {
+ Some(s) => match s {
+ stanza::roster::Subscription::Both => Subscription::Buddy,
+ stanza::roster::Subscription::From => Subscription::OnlyIn,
+ stanza::roster::Subscription::None => Subscription::None,
+ stanza::roster::Subscription::Remove => Subscription::None,
+ stanza::roster::Subscription::To => Subscription::OnlyOut,
+ },
+ None => Subscription::None,
+ },
+ };
+ Contact {
+ user_jid: value.jid,
+ subscription,
+ name: value.name,
+ groups: HashSet::from_iter(value.groups.into_iter().filter_map(|group| group.0)),
+ }
+ }
+}
diff --git a/stanza/src/roster.rs b/stanza/src/roster.rs
index ec83403..0181193 100644
--- a/stanza/src/roster.rs
+++ b/stanza/src/roster.rs
@@ -37,16 +37,16 @@ impl IntoElement for Query {
#[derive(Clone, Debug)]
pub struct Item {
/// signals subscription pre-approval (server only)
- approved: Option<bool>,
+ pub approved: Option<bool>,
/// signals subscription sub-states (server only)
- ask: bool,
+ pub ask: bool,
/// uniquely identifies item
- jid: JID,
+ pub jid: JID,
/// handle that is determined by user, not contact
- name: Option<String>,
+ pub name: Option<String>,
/// state of the presence subscription
- subscription: Option<Subscription>,
- groups: Vec<Group>,
+ pub subscription: Option<Subscription>,
+ pub groups: Vec<Group>,
}
impl FromElement for Item {
@@ -140,7 +140,7 @@ impl ToString for Subscription {
#[derive(Clone, Debug)]
// TODO: check if should be option or not
-pub struct Group(Option<String>);
+pub struct Group(pub Option<String>);
impl FromElement for Group {
fn from_element(mut element: peanuts::Element) -> peanuts::element::DeserializeResult<Self> {