aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--luz/scratch87
-rw-r--r--luz/src/connection/mod.rs263
-rw-r--r--luz/src/connection/read.rs383
-rw-r--r--luz/src/connection/write.rs50
-rw-r--r--luz/src/lib.rs1854
-rw-r--r--luz/src/main.rs6
6 files changed, 1375 insertions, 1268 deletions
diff --git a/luz/scratch b/luz/scratch
new file mode 100644
index 0000000..9954aef
--- /dev/null
+++ b/luz/scratch
@@ -0,0 +1,87 @@
+# logic:
+
+- db
+- pending iqs
+- ui update sender
+
+ ## logic methods
+
+ - handle_offline: called by lamp
+ - handle_online: called by lamp
+ - handle_stanza: called by read thread
+
+ - handle_connect: called by lamp
+ - handle_disconnect: called by lamp
+
+ - handle_error?: called by lamp handle threads and read thread for error logging
+ - handle_stream_error: called by supervisor when stream needs to be reset
+
+
+# lamp:
+
+- login jid (bare or full)
+- password provider
+- lamp command receiver
+- connected session state(connected, supervisorhandle to disconnect current connected session)
+- connected state (if intended to be connected or not, for retrying reconnection every minute or something)
+- on_crash for connection supervisor
+- internal logic struct which has methods to handle logic commands
+
+# connected:
+
+- writehandle
+- current full jid for connected session
+
+# supervisor:
+
+- control_recv
+- read_thread_crash
+- write_thread_crash
+- read_control_handle
+- write_control_handle
+- on_crash
+- connected
+- password
+- logic
+
+# read:
+
+must be passed around when crash
+- supervisor_control
+- tasks
+
+can be cloned from supervisor
+- connected .
+- logic .
+
+can be recreated by supervisor
+- stream
+- disconnecting
+- disconnect_timedout
+- on_crash
+- control_recv
+
+# write:
+
+must be passed around when crash
+- stanza_recv
+
+can be recreated by supervisor
+- stream
+- on_crash
+- control_recv
+
+
+message types:
+
+command:
+
+- getroster
+- send message
+- etc.
+
+lamp commands:
+
+- connect
+- disconnect
+- command(command)
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs
index 84109bc..e209d47 100644
--- a/luz/src/connection/mod.rs
+++ b/luz/src/connection/mod.rs
@@ -9,139 +9,120 @@ use std::{
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use jid::JID;
-use read::{ReadControl, ReadControlHandle};
+use read::{ReadControl, ReadControlHandle, ReadState};
use stanza::client::Stanza;
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
use tracing::info;
-use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
+use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
use crate::{
db::Db,
error::{Error, ReadError, WriteError},
- UpdateMessage,
+ Connected, LogicState, UpdateMessage,
};
mod read;
pub(crate) mod write;
pub struct Supervisor {
- connection_commands: mpsc::Receiver<SupervisorCommand>,
- writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
- reader_crash: oneshot::Receiver<(
- Db,
- mpsc::Sender<UpdateMessage>,
- tokio::task::JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- sender: mpsc::Sender<UpdateMessage>,
- writer_handle: WriteControlHandle,
- reader_handle: ReadControlHandle,
- on_shutdown: oneshot::Sender<()>,
- jid: Arc<Mutex<JID>>,
+ command_recv: mpsc::Receiver<SupervisorCommand>,
+ reader_crash: oneshot::Receiver<ReadState>,
+ writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
+ read_control_handle: ReadControlHandle,
+ write_control_handle: WriteControlHandle,
+ on_crash: oneshot::Sender<()>,
+ // jid in connected stays the same over the life of the supervisor (the connection session)
+ connected: Connected,
password: Arc<String>,
+ logic: LogicState,
}
pub enum SupervisorCommand {
Disconnect,
// for if there was a stream error, require to reconnect
// couldn't stream errors just cause a crash? lol
- Reconnect(State),
+ Reconnect(ChildState),
}
-pub enum State {
- Write(mpsc::Receiver<WriteMessage>),
- Read(
- (
- Db,
- mpsc::Sender<UpdateMessage>,
- tokio::task::JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- ),
- ),
+pub enum ChildState {
+ Write(WriteState),
+ Read(ReadState),
}
impl Supervisor {
fn new(
- connection_commands: mpsc::Receiver<SupervisorCommand>,
- writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
- reader_crash: oneshot::Receiver<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- sender: mpsc::Sender<UpdateMessage>,
- writer_handle: WriteControlHandle,
- reader_handle: ReadControlHandle,
- on_shutdown: oneshot::Sender<()>,
- jid: Arc<Mutex<JID>>,
+ command_recv: mpsc::Receiver<SupervisorCommand>,
+ reader_crash: oneshot::Receiver<ReadState>,
+ writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
+ read_control_handle: ReadControlHandle,
+ write_control_handle: WriteControlHandle,
+ on_crash: oneshot::Sender<()>,
+ connected: Connected,
password: Arc<String>,
+ logic: LogicState,
) -> Self {
Self {
- connection_commands,
- writer_crash,
- sender,
- writer_handle,
- reader_handle,
+ command_recv,
reader_crash,
- on_shutdown,
- jid,
+ writer_crash,
+ read_control_handle,
+ write_control_handle,
+ on_crash,
+ connected,
password,
+ logic,
}
}
async fn run(mut self) {
loop {
tokio::select! {
- Some(msg) = self.connection_commands.recv() => {
+ Some(msg) = self.command_recv.recv() => {
match msg {
SupervisorCommand::Disconnect => {
info!("disconnecting");
- let _ = self.writer_handle.send(WriteControl::Disconnect).await;
- let _ = self.reader_handle.send(ReadControl::Disconnect).await;
+ // TODO: do handle_disconnect here
+ let _ = self.write_control_handle.send(WriteControl::Disconnect).await;
+ let _ = self.read_control_handle.send(ReadControl::Disconnect).await;
info!("sent disconnect command");
tokio::select! {
_ = async { tokio::join!(
- async { let _ = (&mut self.writer_handle.handle).await; },
- async { let _ = (&mut self.reader_handle.handle).await; }
+ async { let _ = (&mut self.write_control_handle.handle).await; },
+ async { let _ = (&mut self.read_control_handle.handle).await; }
) } => {},
+ // TODO: config timeout
_ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
- (&mut self.reader_handle.handle).abort();
- (&mut self.writer_handle.handle).abort();
+ (&mut self.read_control_handle.handle).abort();
+ (&mut self.write_control_handle.handle).abort();
}
}
info!("disconnected");
break;
},
+ // TODO: Reconnect without aborting, gentle reconnect.
SupervisorCommand::Reconnect(state) => {
// TODO: please omfg
// send abort to read stream, as already done, consider
let (read_state, mut write_state);
match state {
// TODO: proper state things for read and write thread
- State::Write(receiver) => {
+ ChildState::Write(receiver) => {
write_state = receiver;
let (send, recv) = oneshot::channel();
- let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
+ let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
if let Ok(state) = recv.await {
read_state = state;
} else {
break
}
},
- State::Read(read) => {
+ ChildState::Read(read) => {
read_state = read;
let (send, recv) = oneshot::channel();
- let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
+ let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
// TODO: need a tokio select, in case the state arrives from somewhere else
if let Ok(state) = recv.await {
write_state = state;
@@ -151,103 +132,99 @@ impl Supervisor {
},
}
- let mut jid = self.jid.lock().await;
+ let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
+ // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that
let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
-
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
- self.writer_handle =
- WriteControlHandle::reconnect(write, send, write_state);
+ self.write_control_handle =
+ WriteControlHandle::reconnect(write, send, write_state.stanza_recv);
let (send, recv) = oneshot::channel();
self.reader_crash = recv;
- let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = read_state;
- self.reader_handle = ReadControlHandle::reconnect(
+ self.read_control_handle = ReadControlHandle::reconnect(
read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
send,
- db,
- update_sender,
- supervisor_command,
- write_sender,
- tasks,
- pending_iqs,
);
},
Err(e) => {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
- write_state.close();
- while let Some(msg) = write_state.recv().await {
+ write_state.stanza_recv.close();
+ while let Some(msg) = write_state.stanza_recv.recv().await {
let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error?
- let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
+ let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break;
},
}
},
}
},
- Ok((write_msg, mut write_recv)) = &mut self.writer_crash => {
+ Ok((write_msg, mut write_state)) = &mut self.writer_crash => {
// consider awaiting/aborting the read and write threads
let (send, recv) = oneshot::channel();
- let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
- let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = tokio::select! {
+ let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
+ let read_state = tokio::select! {
Ok(s) = recv => s,
Ok(s) = &mut self.reader_crash => s,
// in case, just break as irrecoverable
else => break,
};
- let mut jid = self.jid.lock().await;
+ let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
+ // TODO: same here
let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
Ok(c) => {
let (read, write) = c.split();
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
- self.writer_handle =
- WriteControlHandle::reconnect_retry(write, send, write_msg, write_recv);
+ self.write_control_handle =
+ WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, write_msg);
let (send, recv) = oneshot::channel();
self.reader_crash = recv;
- self.reader_handle = ReadControlHandle::reconnect(
+ self.read_control_handle = ReadControlHandle::reconnect(
read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
send,
- db,
- update_sender,
- supervisor_command,
- write_sender,
- tasks,
- pending_iqs,
);
},
Err(e) => {
// if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
- write_recv.close();
+ write_state.stanza_recv.close();
let _ = write_msg.respond_to.send(Err(WriteError::LostConnection));
- while let Some(msg) = write_recv.recv().await {
+ while let Some(msg) = write_state.stanza_recv.recv().await {
let _ = msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error to send?
- let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
+ let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break;
},
}
},
- Ok((db, update_sender, tasks, supervisor_control, write_handle, pending_iqs)) = &mut self.reader_crash => {
+ Ok(read_state) = &mut self.reader_crash => {
let (send, recv) = oneshot::channel();
- let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
- let (retry_msg, mut write_receiver) = tokio::select! {
+ let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
+ let (retry_msg, mut write_state) = tokio::select! {
Ok(s) = recv => (None, s),
Ok(s) = &mut self.writer_crash => (Some(s.0), s.1),
// in case, just break as irrecoverable
else => break,
};
- let mut jid = self.jid.lock().await;
+ let mut jid = self.connected.jid.clone();
let mut domain = jid.domainpart.clone();
let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
match connection {
@@ -256,35 +233,33 @@ impl Supervisor {
let (send, recv) = oneshot::channel();
self.writer_crash = recv;
if let Some(msg) = retry_msg {
- self.writer_handle =
- WriteControlHandle::reconnect_retry(write, send, msg, write_receiver);
+ self.write_control_handle =
+ WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, msg);
} else {
- self.writer_handle = WriteControlHandle::reconnect(write, send, write_receiver)
+ self.write_control_handle = WriteControlHandle::reconnect(write, send, write_state.stanza_recv)
}
let (send, recv) = oneshot::channel();
self.reader_crash = recv;
- self.reader_handle = ReadControlHandle::reconnect(
+ self.read_control_handle = ReadControlHandle::reconnect(
read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
send,
- db,
- update_sender,
- supervisor_control,
- write_handle,
- tasks,
- pending_iqs,
);
},
Err(e) => {
// if reconnection failure, respond to all current messages with lost connection error.
- write_receiver.close();
+ write_state.stanza_recv.close();
if let Some(msg) = retry_msg {
msg.respond_to.send(Err(WriteError::LostConnection));
}
- while let Some(msg) = write_receiver.recv().await {
+ while let Some(msg) = write_state.stanza_recv.recv().await {
msg.respond_to.send(Err(WriteError::LostConnection));
}
// TODO: is this the correct error?
- let _ = self.sender.send(UpdateMessage::Error(Error::LostConnection)).await;
+ let _ = self.logic.update_sender.send(UpdateMessage::Error(Error::LostConnection)).await;
break;
},
}
@@ -292,7 +267,8 @@ impl Supervisor {
else => break,
}
}
- let _ = self.on_shutdown.send(());
+ // TODO: maybe don't just on_crash
+ let _ = self.on_crash.send(());
}
}
@@ -337,40 +313,47 @@ impl DerefMut for SupervisorSender {
impl SupervisorHandle {
pub fn new(
streams: BoundJabberStream<Tls>,
- update_sender: mpsc::Sender<UpdateMessage>,
- db: Db,
- on_shutdown: oneshot::Sender<()>,
- jid: Arc<Mutex<JID>>,
+ on_crash: oneshot::Sender<()>,
+ jid: JID,
password: Arc<String>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ logic: LogicState,
) -> (WriteHandle, Self) {
- let (command_sender, command_receiver) = mpsc::channel(20);
- let (writer_error_sender, writer_error_receiver) = oneshot::channel();
- let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
+ let (command_send, command_recv) = mpsc::channel(20);
+ let (writer_crash_send, writer_crash_recv) = oneshot::channel();
+ let (reader_crash_send, reader_crash_recv) = oneshot::channel();
+
+ let (read_stream, write_stream) = streams.split();
- let (reader, writer) = streams.split();
let (write_handle, write_control_handle) =
- WriteControlHandle::new(writer, writer_error_sender);
- let jabber_reader_control_handle = ReadControlHandle::new(
- reader,
- reader_crash_sender,
- db,
- update_sender.clone(),
- command_sender.clone(),
- write_handle.clone(),
- pending_iqs,
+ WriteControlHandle::new(write_stream, writer_crash_send);
+
+ let connected = Connected {
+ jid,
+ write_handle: write_handle.clone(),
+ };
+
+ let supervisor_sender = SupervisorSender {
+ sender: command_send,
+ };
+
+ let read_control_handle = ReadControlHandle::new(
+ read_stream,
+ connected.clone(),
+ logic.clone(),
+ supervisor_sender.clone(),
+ reader_crash_send,
);
let actor = Supervisor::new(
- command_receiver,
- writer_error_receiver,
- reader_crash_receiver,
- update_sender,
+ command_recv,
+ reader_crash_recv,
+ writer_crash_recv,
+ read_control_handle,
write_control_handle,
- jabber_reader_control_handle,
- on_shutdown,
- jid,
+ on_crash,
+ connected,
password,
+ logic,
);
let handle = tokio::spawn(async move { actor.run().await });
@@ -378,9 +361,7 @@ impl SupervisorHandle {
(
write_handle,
Self {
- sender: SupervisorSender {
- sender: command_sender,
- },
+ sender: supervisor_sender,
handle,
},
)
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index d510c5d..d310a12 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -22,66 +22,59 @@ use crate::{
error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
- UpdateMessage,
+ Connected, LogicState, UpdateMessage,
};
-use super::{write::WriteHandle, SupervisorCommand};
+use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
+/// read actor
pub struct Read {
- control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- db: Db,
- update_sender: mpsc::Sender<UpdateMessage>,
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- write_handle: WriteHandle,
- tasks: JoinSet<()>,
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
- // TODO: use proper stanza ids
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+
+ // all the threads spawned by the current connection session
+ tasks: JoinSet<()>,
+
+ // for handling incoming stanzas
+ // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
+ connected: Connected,
+ logic: LogicState,
+ supervisor_control: SupervisorSender,
+
+ // control stuff
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
+}
+
+/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
+pub struct ReadState {
+ pub supervisor_control: SupervisorSender,
+ // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
+ pub tasks: JoinSet<()>,
}
impl Read {
fn new(
- control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- db: Db,
- update_sender: mpsc::Sender<UpdateMessage>,
- // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- write_handle: WriteHandle,
tasks: JoinSet<()>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ connected: Connected,
+ logic: LogicState,
+ supervisor_control: SupervisorSender,
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (_send, recv) = oneshot::channel();
Self {
- control_receiver,
stream,
- on_crash,
- db,
- update_sender,
- supervisor_control,
- write_handle,
- tasks,
disconnecting: false,
disconnect_timedout: recv,
- pending_iqs,
+ tasks,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
}
}
@@ -110,7 +103,7 @@ impl Read {
})
},
ReadControl::Abort(sender) => {
- let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
+ let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
break;
},
};
@@ -119,7 +112,7 @@ impl Read {
println!("read stanza");
match s {
Ok(s) => {
- self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone()));
+ self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
},
Err(e) => {
println!("error: {:?}", e);
@@ -146,8 +139,7 @@ impl Read {
if self.disconnecting == true {
break;
} else {
- // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
- let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
+ let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
}
break;
},
@@ -158,7 +150,7 @@ impl Read {
}
println!("stopping read thread");
// when it aborts, must clear iq map no matter what
- let mut iqs = self.pending_iqs.lock().await;
+ let mut iqs = self.logic.pending.lock().await;
for (_id, sender) in iqs.drain() {
let _ = sender.send(Err(ReadError::LostConnection));
}
@@ -170,249 +162,10 @@ impl Read {
// - access database
// - disconnect proper, reconnect
// - respond to server requests
-async fn handle_stanza(
- stanza: Stanza,
- update_sender: mpsc::Sender<UpdateMessage>,
- db: Db,
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- write_handle: WriteHandle,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
-) {
- match stanza {
- Stanza::Message(stanza_message) => {
- if let Some(mut from) = stanza_message.from {
- // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
- let timestamp = stanza_message
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- // TODO: group chat messages
- let mut message = Message {
- id: stanza_message
- .id
- // TODO: proper id storage
- .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
- .unwrap_or_else(|| Uuid::new_v4()),
- from: from.clone(),
- timestamp,
- body: Body {
- // TODO: should this be an option?
- body: stanza_message
- .body
- .map(|body| body.body)
- .unwrap_or_default()
- .unwrap_or_default(),
- },
- };
- // TODO: can this be more efficient?
- let result = db
- .create_message_with_user_resource_and_chat(message.clone(), from.clone())
- .await;
- if let Err(e) = result {
- tracing::error!("messagecreate");
- let _ = update_sender
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MessageHistory(e.into()),
- )))
- .await;
- }
- message.from = message.from.as_bare();
- from = from.as_bare();
- let _ = update_sender
- .send(UpdateMessage::Message { to: from, message })
- .await;
- } else {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MissingFrom,
- )))
- .await;
- }
- }
- Stanza::Presence(presence) => {
- if let Some(from) = presence.from {
- match presence.r#type {
- Some(r#type) => match r#type {
- // error processing a presence from somebody
- stanza::client::presence::PresenceType::Error => {
- // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Presence(
- // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
- PresenceError::StanzaError(
- presence
- .errors
- .first()
- .cloned()
- .expect("error MUST have error"),
- ),
- )))
- .await;
- }
- // should not happen (error to server)
- stanza::client::presence::PresenceType::Probe => {
- // TODO: should probably write an error and restart stream
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::Unsupported,
- )))
- .await;
- }
- stanza::client::presence::PresenceType::Subscribe => {
- // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
- let _ = update_sender
- .send(UpdateMessage::SubscriptionRequest(from))
- .await;
- }
- stanza::client::presence::PresenceType::Unavailable => {
- let offline = Offline {
- status: presence.status.map(|status| status.status.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = update_sender
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Offline(offline),
- },
- })
- .await;
- }
- // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
- stanza::client::presence::PresenceType::Subscribed => {}
- stanza::client::presence::PresenceType::Unsubscribe => {}
- stanza::client::presence::PresenceType::Unsubscribed => {}
- },
- None => {
- let online = Online {
- show: presence.show.map(|show| match show {
- stanza::client::presence::Show::Away => Show::Away,
- stanza::client::presence::Show::Chat => Show::Chat,
- stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
- stanza::client::presence::Show::Xa => Show::ExtendedAway,
- }),
- status: presence.status.map(|status| status.status.0),
- priority: presence.priority.map(|priority| priority.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = update_sender
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Online(online),
- },
- })
- .await;
- }
- }
- } else {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::MissingFrom,
- )))
- .await;
- }
- }
- Stanza::Iq(iq) => match iq.r#type {
- stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
- let send;
- {
- send = pending_iqs.lock().await.remove(&iq.id);
- }
- if let Some(send) = send {
- send.send(Ok(Stanza::Iq(iq)));
- } else {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
- iq.id,
- ))))
- .await;
- }
- }
- // TODO: send unsupported to server
- // TODO: proper errors i am so tired please
- stanza::client::iq::IqType::Get => {}
- stanza::client::iq::IqType::Set => {
- if let Some(query) = iq.query {
- match query {
- stanza::client::iq::Query::Roster(mut query) => {
- // TODO: there should only be one
- if let Some(item) = query.items.pop() {
- match item.subscription {
- Some(stanza::roster::Subscription::Remove) => {
- db.delete_contact(item.jid.clone()).await;
- update_sender
- .send(UpdateMessage::RosterDelete(item.jid))
- .await;
- // TODO: send result
- }
- _ => {
- let contact: Contact = item.into();
- if let Err(e) = db.upsert_contact(contact.clone()).await {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Roster(
- RosterError::Cache(e.into()),
- )))
- .await;
- }
- let _ = update_sender
- .send(UpdateMessage::RosterUpdate(contact))
- .await;
- // TODO: send result
- // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
- // from: ,
- // id: todo!(),
- // to: todo!(),
- // r#type: todo!(),
- // lang: todo!(),
- // query: todo!(),
- // errors: todo!(),
- // }));
- }
- }
- }
- }
- // TODO: send unsupported to server
- _ => {}
- }
- } else {
- // TODO: send error (unsupported) to server
- }
- }
- },
- Stanza::Error(error) => {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Stream(error)))
- .await;
- // TODO: reconnect
- }
- Stanza::OtherContent(content) => {
- let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content)));
- // TODO: send error to write_thread
- }
- }
-}
pub enum ReadControl {
Disconnect,
- Abort(
- oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- ),
+ Abort(oneshot::Sender<ReadState>),
}
pub struct ReadControlHandle {
@@ -437,32 +190,21 @@ impl DerefMut for ReadControlHandle {
impl ReadControlHandle {
pub fn new(
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- db: Db,
- sender: mpsc::Sender<UpdateMessage>,
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- jabber_write: WriteHandle,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ connected: Connected,
+ logic: LogicState,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
- control_receiver,
stream,
- on_crash,
- db,
- sender,
- supervisor_control,
- jabber_write,
JoinSet::new(),
- pending_iqs,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
);
let handle = tokio::spawn(async move { actor.run().await });
@@ -474,33 +216,22 @@ impl ReadControlHandle {
pub fn reconnect(
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- db: Db,
- sender: mpsc::Sender<UpdateMessage>,
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- jabber_write: WriteHandle,
tasks: JoinSet<()>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ connected: Connected,
+ logic: LogicState,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
- control_receiver,
stream,
- on_crash,
- db,
- sender,
- supervisor_control,
- jabber_write,
tasks,
- pending_iqs,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
);
let handle = tokio::spawn(async move { actor.run().await });
diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs
index 3333d38..ff78b81 100644
--- a/luz/src/connection/write.rs
+++ b/luz/src/connection/write.rs
@@ -9,12 +9,21 @@ use tokio::{
use crate::error::WriteError;
-// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
+/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
pub struct Write {
+ stream: BoundJabberWriter<Tls>,
+
+ /// connection session write queue
stanza_receiver: mpsc::Receiver<WriteMessage>,
+
+ // control stuff
control_receiver: mpsc::Receiver<WriteControl>,
- stream: BoundJabberWriter<Tls>,
- on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+}
+
+/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue
+pub struct WriteState {
+ pub stanza_recv: mpsc::Receiver<WriteMessage>,
}
#[derive(Debug)]
@@ -25,21 +34,21 @@ pub struct WriteMessage {
pub enum WriteControl {
Disconnect,
- Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
+ Abort(oneshot::Sender<WriteState>),
}
impl Write {
fn new(
+ stream: BoundJabberWriter<Tls>,
stanza_receiver: mpsc::Receiver<WriteMessage>,
control_receiver: mpsc::Receiver<WriteControl>,
- stream: BoundJabberWriter<Tls>,
- supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
) -> Self {
Self {
+ stream,
stanza_receiver,
control_receiver,
- stream,
- on_crash: supervisor,
+ on_crash,
}
}
@@ -55,7 +64,12 @@ impl Write {
peanuts::Error::ReadError(_error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
// TODO: upon reconnect, make sure we are not stuck in a reconnection loop
- let _ = self.on_crash.send((retry_msg, self.stanza_receiver));
+ let _ = self.on_crash.send((
+ retry_msg,
+ WriteState {
+ stanza_recv: self.stanza_receiver,
+ },
+ ));
return;
}
_ => {
@@ -106,7 +120,7 @@ impl Write {
},
// in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource
WriteControl::Abort(sender) => {
- let _ = sender.send(self.stanza_receiver);
+ let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver });
break;
},
}
@@ -117,7 +131,7 @@ impl Write {
Err(e) => match &e {
peanuts::Error::ReadError(_error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
- let _ = self.on_crash.send((msg, self.stanza_receiver));
+ let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver }));
break;
}
_ => {
@@ -190,12 +204,12 @@ impl DerefMut for WriteControlHandle {
impl WriteControlHandle {
pub fn new(
stream: BoundJabberWriter<Tls>,
- supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
) -> (WriteHandle, Self) {
let (control_sender, control_receiver) = mpsc::channel(20);
let (stanza_sender, stanza_receiver) = mpsc::channel(20);
- let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
let handle = tokio::spawn(async move { actor.run().await });
(
@@ -211,13 +225,13 @@ impl WriteControlHandle {
pub fn reconnect_retry(
stream: BoundJabberWriter<Tls>,
- supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
- retry_msg: WriteMessage,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
stanza_receiver: mpsc::Receiver<WriteMessage>,
+ retry_msg: WriteMessage,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
- let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
Self {
@@ -228,12 +242,12 @@ impl WriteControlHandle {
pub fn reconnect(
stream: BoundJabberWriter<Tls>,
- supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
stanza_receiver: mpsc::Receiver<WriteMessage>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
- let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
let handle = tokio::spawn(async move { actor.run().await });
Self {
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 99c96e0..3498ff1 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
+ str::FromStr,
sync::Arc,
time::Duration,
};
@@ -10,12 +11,12 @@ use chrono::Utc;
use connection::{write::WriteMessage, SupervisorSender};
use db::Db;
use error::{
- ActorError, CommandError, ConnectionError, DatabaseError, ReadError, RosterError, StatusError,
- WriteError,
+ ActorError, CommandError, ConnectionError, DatabaseError, IqError, MessageRecvError,
+ PresenceError, ReadError, RosterError, StatusError, WriteError,
};
use futures::{future::Fuse, FutureExt};
use jabber::JID;
-use presence::{Offline, Online, Presence, PresenceType};
+use presence::{Offline, Online, Presence, PresenceType, Show};
use roster::{Contact, ContactUpdate};
use sqlx::SqlitePool;
use stanza::client::{
@@ -43,395 +44,766 @@ pub mod presence;
pub mod roster;
pub mod user;
-pub struct Luz {
- command_sender: mpsc::Sender<CommandMessage>,
- receiver: mpsc::Receiver<CommandMessage>,
- jid: Arc<Mutex<JID>>,
- // TODO: use a dyn passwordprovider trait to avoid storing password in memory
- password: Arc<String>,
- connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- db: Db,
- sender: mpsc::Sender<UpdateMessage>,
- /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
- connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
- // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
- // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
- tasks: JoinSet<()>,
+pub enum Command {
+ /// get the roster. if offline, retreive cached version from database. should be stored in application memory
+ GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
+ /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
+ // TODO: paging and filtering
+ GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
+ // TODO: paging and filtering
+ GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
+ // TODO: paging and filtering
+ GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
+ /// get a specific chat by jid
+ GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
+ /// get message history for chat (does appropriate mam things)
+ // TODO: paging and filtering
+ GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
+ /// delete a chat from your chat history, along with all the corresponding messages
+ DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
+ /// delete a message from your chat history
+ DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
+ /// get a user from your users database
+ GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
+ /// add a contact to your roster, with a status of none, no subscriptions.
+ AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
+ BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
+ SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
+ AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
+ AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// unsubscribe to a contact, but don't remove their subscription.
+ UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// stop a contact from being subscribed, but stay subscribed to the contact.
+ UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// remove subscriptions to and from contact, but keep in roster.
+ UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
+ /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
+ DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
+ /// update contact. contact details will be overwritten with the contents of the contactupdate struct.
+ UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
+ /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
+ SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
+ /// send presence stanza
+ // TODO: cache presence stanza
+ SendPresence(
+ Option<JID>,
+ PresenceType,
+ oneshot::Sender<Result<(), WriteError>>,
+ ),
+ /// send a directed presence (usually to a non-contact).
+ // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
+ /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
+ /// chatroom). if disconnected, will be cached so when client connects, message will be sent.
+ SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
}
-impl Luz {
- fn new(
- command_sender: mpsc::Sender<CommandMessage>,
- receiver: mpsc::Receiver<CommandMessage>,
- jid: Arc<Mutex<JID>>,
- password: String,
- connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
- connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
- db: Db,
- sender: mpsc::Sender<UpdateMessage>,
- ) -> Self {
+#[derive(Debug)]
+pub struct Client {
+ sender: mpsc::Sender<LuzMessage>,
+ timeout: Duration,
+}
+
+impl Clone for Client {
+ fn clone(&self) -> Self {
Self {
- jid,
- password: Arc::new(password),
- connected,
- db,
- receiver,
- sender,
- tasks: JoinSet::new(),
- connection_supervisor_shutdown,
- pending_iqs: Arc::new(Mutex::new(HashMap::new())),
- command_sender,
+ sender: self.sender.clone(),
+ timeout: self.timeout,
}
}
+}
- async fn run(mut self) {
- loop {
- let msg = tokio::select! {
- // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
- // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
- _ = &mut self.connection_supervisor_shutdown => {
- *self.connected.lock().await = None;
- continue;
- }
- Some(msg) = self.receiver.recv() => {
- msg
- },
- else => break,
- };
- // TODO: consider separating disconnect/connect and commands apart from commandmessage
- // TODO: dispatch commands separate tasks
- match msg {
- CommandMessage::Connect => {
- let mut connection_lock = self.connected.lock().await;
- match connection_lock.as_ref() {
- Some(_) => {
- self.sender
- .send(UpdateMessage::Error(Error::AlreadyConnected))
+impl Deref for Client {
+ type Target = mpsc::Sender<LuzMessage>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for Client {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl Client {
+ pub async fn connect(&self) -> Result<(), ActorError> {
+ self.send(LuzMessage::Connect).await?;
+ Ok(())
+ }
+
+ pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
+ self.send(LuzMessage::Disconnect).await?;
+ Ok(())
+ }
+
+ pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
+ let (command_sender, command_receiver) = mpsc::channel(20);
+ let (update_send, update_recv) = mpsc::channel(20);
+
+ // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
+ let (_sup_send, sup_recv) = oneshot::channel();
+ let sup_recv = sup_recv.fuse();
+
+ let logic = LogicState {
+ db,
+ pending: Arc::new(Mutex::new(HashMap::new())),
+ update_sender: update_send,
+ };
+
+ let actor = Luz::new(jid, password, command_receiver, None, sup_recv, logic);
+ tokio::spawn(async move { actor.run().await });
+
+ (
+ Self {
+ sender: command_sender,
+ // TODO: configure timeout
+ timeout: Duration::from_secs(10),
+ },
+ update_recv,
+ )
+ }
+
+ pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::GetRoster(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let roster = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(roster)
+ }
+
+ pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::GetChats(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::GetChatsOrdered(send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chats_ordered_with_latest_messages(
+ &self,
+ ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(
+ Command::GetChatsOrderedWithLatestMessages(send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
+ pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::GetChat(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chat = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chat)
+ }
+
+ pub async fn get_messages(
+ &self,
+ jid: JID,
+ ) -> Result<Vec<Message>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::GetMessages(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let messages = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(messages)
+ }
+
+ pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::DeleteChat(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::DeleteMessage(id, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::GetUser(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::AddContact(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::BuddyRequest(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::SubscriptionRequest(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::AcceptBuddyRequest(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn accept_subscription_request(
+ &self,
+ jid: JID,
+ ) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::AcceptSubscriptionRequest(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::UnsubscribeFromContact(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::UnsubscribeContact(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::UnfriendContact(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::DeleteContact(jid, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn update_contact(
+ &self,
+ jid: JID,
+ update: ContactUpdate,
+ ) -> Result<(), CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::UpdateContact(
+ jid, update, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::SetStatus(online, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(LuzMessage::Command(Command::SendMessage(jid, body, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+}
+
+#[derive(Clone)]
+pub struct LogicState {
+ db: Db,
+ pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ update_sender: mpsc::Sender<UpdateMessage>,
+}
+
+impl LogicState {
+ pub async fn handle_connect(self, connection: Connected) {
+ let (send, recv) = oneshot::channel();
+ debug!("getting roster");
+ self.clone()
+ .handle_online(Command::GetRoster(send), connection.clone())
+ .await;
+ debug!("sent roster req");
+ let roster = recv.await;
+ debug!("got roster");
+ match roster {
+ Ok(r) => match r {
+ Ok(roster) => {
+ let online = self.db.read_cached_status().await;
+ let online = match online {
+ Ok(online) => online,
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionError::StatusCacheError(e.into()),
+ )))
.await;
+ Online::default()
}
- None => {
- let streams_result;
- {
- let mut jid = self.jid.lock().await;
- let mut domain = jid.domainpart.clone();
- // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
- streams_result = jabber::connect_and_login(
- &mut jid,
- &*self.password,
- &mut domain,
- )
- .await;
- debug!("connected and logged in as {}", jid);
- debug!("test");
+ };
+ let (send, recv) = oneshot::channel();
+ self.clone()
+ .handle_online(
+ Command::SendPresence(None, PresenceType::Online(online.clone()), send),
+ connection,
+ )
+ .await;
+ let set_status = recv.await;
+ match set_status {
+ Ok(s) => match s {
+ Ok(()) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Online(online, roster))
+ .await;
}
- match streams_result {
- Ok(s) => {
- debug!("ok stream result");
- let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
- let (writer, supervisor) = SupervisorHandle::new(
- s,
- self.sender.clone(),
- self.db.clone(),
- shutdown_send,
- self.jid.clone(),
- self.password.clone(),
- self.pending_iqs.clone(),
- );
- let shutdown_recv = shutdown_recv.fuse();
- self.connection_supervisor_shutdown = shutdown_recv;
- // TODO: get roster and send initial presence
- let (send, recv) = oneshot::channel();
- debug!("getting roster");
- CommandMessage::GetRoster(send)
- .handle_online(
- writer.clone(),
- supervisor.sender(),
- self.jid.clone(),
- self.db.clone(),
- self.sender.clone(),
- self.pending_iqs.clone(),
- )
- .await;
- debug!("sent roster req");
- let roster = recv.await;
- debug!("got roster");
- match roster {
- Ok(r) => {
- match r {
- Ok(roster) => {
- let online = self.db.read_cached_status().await;
- let online = match online {
- Ok(online) => online,
- Err(e) => {
- let _ = self
- .sender
- .send(UpdateMessage::Error(
- Error::Connecting(
- ConnectionError::StatusCacheError(
- e.into(),
- ),
- ),
- ))
- .await;
- Online::default()
- }
- };
- let (send, recv) = oneshot::channel();
- CommandMessage::SendPresence(
- None,
- PresenceType::Online(online.clone()),
- send,
- )
- .handle_online(
- writer.clone(),
- supervisor.sender(),
- self.jid.clone(),
- self.db.clone(),
- self.sender.clone(),
- self.pending_iqs.clone(),
- )
- .await;
- let set_status = recv.await;
- match set_status {
- Ok(s) => match s {
- Ok(()) => {
- *connection_lock =
- Some((writer, supervisor));
- let _ = self
- .sender
- .send(UpdateMessage::Online(
- online, roster,
- ))
- .await;
- continue;
- }
- Err(e) => {
- let _ = self
- .sender
- .send(UpdateMessage::Error(
- Error::Connecting(e.into()),
- ))
- .await;
- }
- },
- Err(e) => {
- let _ = self.sender.send(UpdateMessage::Error(Error::Connecting(ConnectionError::SendPresence(WriteError::Actor(e.into()))))).await;
- }
- }
- }
- Err(e) => {
- let _ = self
- .sender
- .send(UpdateMessage::Error(
- Error::Connecting(e.into()),
- ))
- .await;
- }
- }
- }
- Err(e) => {
- let _ = self
- .sender
- .send(UpdateMessage::Error(Error::Connecting(
- ConnectionError::RosterRetreival(
- RosterError::Write(WriteError::Actor(
- e.into(),
- )),
- ),
- )))
- .await;
- }
- }
- }
- Err(e) => {
- tracing::error!("error: {}", e);
- let _ =
- self.sender.send(UpdateMessage::Error(Error::Connecting(
- ConnectionError::ConnectionFailed(e.into()),
- )));
- }
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
}
- }
- };
- }
- CommandMessage::Disconnect(offline) => {
- match self.connected.lock().await.as_mut() {
- None => {
+ },
+ Err(e) => {
let _ = self
- .sender
- .send(UpdateMessage::Error(Error::AlreadyDisconnected))
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionError::SendPresence(WriteError::Actor(e.into())),
+ )))
.await;
}
- mut c => {
- if let Some((write_handle, supervisor_handle)) = c.take() {
- let offline_presence: stanza::client::presence::Presence =
- offline.clone().into_stanza(None);
- let stanza = Stanza::Presence(offline_presence);
- // TODO: timeout and error check
- write_handle.write(stanza).await;
- let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
- let _ = self.sender.send(UpdateMessage::Offline(offline)).await;
- } else {
- unreachable!()
- };
- }
}
}
- _ => {
- match self.connected.lock().await.as_ref() {
- Some((w, s)) => self.tasks.spawn(msg.handle_online(
- w.clone(),
- s.sender(),
- self.jid.clone(),
- self.db.clone(),
- self.sender.clone(),
- self.pending_iqs.clone(),
- )),
- None => self.tasks.spawn(msg.handle_offline(
- self.jid.clone(),
- self.db.clone(),
- self.sender.clone(),
- )),
- };
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
}
+ },
+ Err(e) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionError::RosterRetreival(RosterError::Write(WriteError::Actor(
+ e.into(),
+ ))),
+ )))
+ .await;
}
}
}
-}
-impl CommandMessage {
- pub async fn handle_offline(
+ pub async fn handle_disconnect(self, connection: Connected) {
+ // TODO: be able to set offline status message
+ let offline_presence: stanza::client::presence::Presence =
+ Offline::default().into_stanza(None);
+ let stanza = Stanza::Presence(offline_presence);
+ // TODO: timeout and error check
+ connection.write_handle.write(stanza).await;
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Offline(Offline::default()))
+ .await;
+ }
+
+ /// stanza errors (recoverable)
+ pub async fn handle_error(self, error: Error) {}
+
+ pub async fn handle_stanza(
self,
- jid: Arc<Mutex<JID>>,
- db: Db,
- update_sender: mpsc::Sender<UpdateMessage>,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
) {
- match self {
- CommandMessage::Connect => unreachable!(),
- CommandMessage::Disconnect(offline) => unreachable!(),
- CommandMessage::GetRoster(sender) => {
- let roster = db.read_cached_roster().await;
- match roster {
- Ok(roster) => {
- let _ = sender.send(Ok(roster));
- }
- Err(e) => {
- let _ = sender.send(Err(RosterError::Cache(e.into())));
+ match stanza {
+ Stanza::Message(stanza_message) => {
+ if let Some(mut from) = stanza_message.from {
+ // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
+ let timestamp = stanza_message
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ // TODO: group chat messages
+ let mut message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ timestamp,
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ let result = self
+ .db
+ .create_message_with_user_resource_and_chat(message.clone(), from.clone())
+ .await;
+ if let Err(e) = result {
+ tracing::error!("messagecreate");
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MessageHistory(e.into()),
+ )))
+ .await;
}
+ message.from = message.from.as_bare();
+ from = from.as_bare();
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Message { to: from, message })
+ .await;
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MissingFrom,
+ )))
+ .await;
}
}
- CommandMessage::GetChats(sender) => {
- let chats = db.read_chats().await.map_err(|e| e.into());
- sender.send(chats);
- }
- CommandMessage::GetChatsOrdered(sender) => {
- let chats = db.read_chats_ordered().await.map_err(|e| e.into());
- sender.send(chats);
- }
- CommandMessage::GetChatsOrderedWithLatestMessages(sender) => {
- let chats = db
- .read_chats_ordered_with_latest_messages()
- .await
- .map_err(|e| e.into());
- sender.send(chats);
- }
- CommandMessage::GetChat(jid, sender) => {
- let chats = db.read_chat(jid).await.map_err(|e| e.into());
- sender.send(chats);
- }
- CommandMessage::GetMessages(jid, sender) => {
- let messages = db.read_message_history(jid).await.map_err(|e| e.into());
- sender.send(messages);
- }
- CommandMessage::DeleteChat(jid, sender) => {
- let result = db.delete_chat(jid).await.map_err(|e| e.into());
- sender.send(result);
- }
- CommandMessage::DeleteMessage(uuid, sender) => {
- let result = db.delete_message(uuid).await.map_err(|e| e.into());
- sender.send(result);
- }
- CommandMessage::GetUser(jid, sender) => {
- let user = db.read_user(jid).await.map_err(|e| e.into());
- sender.send(user);
- }
- // TODO: offline queue to modify roster
- CommandMessage::AddContact(jid, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- CommandMessage::BuddyRequest(jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- CommandMessage::SubscriptionRequest(jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- CommandMessage::AcceptBuddyRequest(jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- CommandMessage::AcceptSubscriptionRequest(jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- CommandMessage::UnsubscribeFromContact(jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- CommandMessage::UnsubscribeContact(jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- CommandMessage::UnfriendContact(jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- CommandMessage::DeleteContact(jid, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- CommandMessage::UpdateContact(jid, contact_update, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- CommandMessage::SetStatus(online, sender) => {
- let result = db
- .upsert_cached_status(online)
- .await
- .map_err(|e| StatusError::Cache(e.into()));
- sender.send(result);
+ Stanza::Presence(presence) => {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
+ PresenceError::StanzaError(
+ presence
+ .errors
+ .first()
+ .cloned()
+ .expect("error MUST have error"),
+ ),
+ )))
+ .await;
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::Unsupported,
+ )))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::SubscriptionRequest(from))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Offline(offline),
+ },
+ })
+ .await;
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => {}
+ stanza::client::presence::PresenceType::Unsubscribe => {}
+ stanza::client::presence::PresenceType::Unsubscribed => {}
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Online(online),
+ },
+ })
+ .await;
+ }
+ }
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::MissingFrom,
+ )))
+ .await;
+ }
}
- // TODO: offline message queue
- CommandMessage::SendMessage(jid, body, sender) => {
- sender.send(Err(WriteError::Disconnected));
+ Stanza::Iq(iq) => match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = self.pending.lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ } else {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
+ iq.id,
+ ))))
+ .await;
+ }
+ }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => {}
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ self.db.delete_contact(item.jid.clone()).await;
+ self.update_sender
+ .send(UpdateMessage::RosterDelete(item.jid))
+ .await;
+ // TODO: send result
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) =
+ self.db.upsert_contact(contact.clone()).await
+ {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Roster(
+ RosterError::Cache(e.into()),
+ )))
+ .await;
+ }
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::RosterUpdate(contact))
+ .await;
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
+ }
+ }
+ }
+ }
+ // TODO: send unsupported to server
+ _ => {}
+ }
+ } else {
+ // TODO: send error (unsupported) to server
+ }
+ }
+ },
+ Stanza::Error(error) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::Stream(error)))
+ .await;
+ // TODO: reconnect
}
- CommandMessage::SendPresence(jid, presence, sender) => {
- sender.send(Err(WriteError::Disconnected));
+ Stanza::OtherContent(content) => {
+ let _ = self
+ .update_sender
+ .send(UpdateMessage::Error(Error::UnrecognizedContent(content)));
+ // TODO: send error to write_thread
}
}
}
- pub async fn handle_online(
- mut self,
- write_handle: WriteHandle,
- supervisor_control: SupervisorSender,
- // TODO: jid could lose resource by the end
- client_jid: Arc<Mutex<JID>>,
- db: Db,
- update_sender: mpsc::Sender<UpdateMessage>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- ) {
- match self {
- CommandMessage::Connect => unreachable!(),
- CommandMessage::Disconnect(_) => unreachable!(),
- CommandMessage::GetRoster(result_sender) => {
+ // pub async fn handle_stream_error(self, error) {}
+ pub async fn handle_online(self, command: Command, connection: Connected) {
+ match command {
+ Command::GetRoster(result_sender) => {
// TODO: jid resource should probably be stored within the connection
- let owned_jid: JID;
debug!("before client_jid lock");
- {
- owned_jid = client_jid.lock().await.clone();
- }
debug!("after client_jid lock");
let iq_id = Uuid::new_v4().to_string();
let (send, iq_recv) = oneshot::channel();
{
- pending_iqs.lock().await.insert(iq_id.clone(), send);
+ self.pending.lock().await.insert(iq_id.clone(), send);
}
let stanza = Stanza::Iq(Iq {
- from: Some(owned_jid),
+ from: Some(connection.jid),
id: iq_id.to_string(),
to: None,
r#type: IqType::Get,
@@ -443,7 +815,8 @@ impl CommandMessage {
errors: Vec::new(),
});
let (send, recv) = oneshot::channel();
- let _ = write_handle
+ let _ = connection
+ .write_handle
.send(WriteMessage {
stanza,
respond_to: send,
@@ -477,8 +850,8 @@ impl CommandMessage {
}) if id == iq_id && r#type == IqType::Result => {
let contacts: Vec<Contact> =
items.into_iter().map(|item| item.into()).collect();
- if let Err(e) = db.replace_cached_roster(contacts.clone()).await {
- update_sender
+ if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await {
+ self.update_sender
.send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
e.into(),
))))
@@ -518,50 +891,51 @@ impl CommandMessage {
}
}
}
- CommandMessage::GetChats(sender) => {
- let chats = db.read_chats().await.map_err(|e| e.into());
+ Command::GetChats(sender) => {
+ let chats = self.db.read_chats().await.map_err(|e| e.into());
sender.send(chats);
}
- CommandMessage::GetChatsOrdered(sender) => {
- let chats = db.read_chats_ordered().await.map_err(|e| e.into());
+ Command::GetChatsOrdered(sender) => {
+ let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
sender.send(chats);
}
- CommandMessage::GetChatsOrderedWithLatestMessages(sender) => {
- let chats = db
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = self
+ .db
.read_chats_ordered_with_latest_messages()
.await
.map_err(|e| e.into());
sender.send(chats);
}
- CommandMessage::GetChat(jid, sender) => {
- let chats = db.read_chat(jid).await.map_err(|e| e.into());
+ Command::GetChat(jid, sender) => {
+ let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
sender.send(chats);
}
- CommandMessage::GetMessages(jid, sender) => {
- let messages = db.read_message_history(jid).await.map_err(|e| e.into());
+ Command::GetMessages(jid, sender) => {
+ let messages = self
+ .db
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
sender.send(messages);
}
- CommandMessage::DeleteChat(jid, sender) => {
- let result = db.delete_chat(jid).await.map_err(|e| e.into());
+ Command::DeleteChat(jid, sender) => {
+ let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
sender.send(result);
}
- CommandMessage::DeleteMessage(uuid, sender) => {
- let result = db.delete_message(uuid).await.map_err(|e| e.into());
+ Command::DeleteMessage(uuid, sender) => {
+ let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
sender.send(result);
}
- CommandMessage::GetUser(jid, sender) => {
- let user = db.read_user(jid).await.map_err(|e| e.into());
+ Command::GetUser(jid, sender) => {
+ let user = self.db.read_user(jid).await.map_err(|e| e.into());
sender.send(user);
}
// TODO: offline queue to modify roster
- CommandMessage::AddContact(jid, sender) => {
- let owned_jid;
- {
- owned_jid = client_jid.lock().await.clone();
- }
+ Command::AddContact(jid, sender) => {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
- from: Some(owned_jid),
+ from: Some(connection.jid),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -581,10 +955,10 @@ impl CommandMessage {
});
let (send, recv) = oneshot::channel();
{
- pending_iqs.lock().await.insert(iq_id.clone(), send);
+ self.pending.lock().await.insert(iq_id.clone(), send);
}
// TODO: write_handle send helper function
- let result = write_handle.write(set_stanza).await;
+ let result = connection.write_handle.write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
@@ -637,7 +1011,7 @@ impl CommandMessage {
}
}
}
- CommandMessage::BuddyRequest(jid, sender) => {
+ Command::BuddyRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
@@ -650,7 +1024,7 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
@@ -668,12 +1042,12 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
let _ = sender.send(result);
}
}
}
- CommandMessage::SubscriptionRequest(jid, sender) => {
+ Command::SubscriptionRequest(jid, sender) => {
// TODO: i should probably have builders
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
@@ -687,10 +1061,10 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
let _ = sender.send(result);
}
- CommandMessage::AcceptBuddyRequest(jid, sender) => {
+ Command::AcceptBuddyRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
@@ -703,7 +1077,7 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
@@ -721,12 +1095,12 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
let _ = sender.send(result);
}
}
}
- CommandMessage::AcceptSubscriptionRequest(jid, sender) => {
+ Command::AcceptSubscriptionRequest(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
@@ -739,10 +1113,10 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
let _ = sender.send(result);
}
- CommandMessage::UnsubscribeFromContact(jid, sender) => {
+ Command::UnsubscribeFromContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
@@ -755,10 +1129,10 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
let _ = sender.send(result);
}
- CommandMessage::UnsubscribeContact(jid, sender) => {
+ Command::UnsubscribeContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
@@ -771,10 +1145,10 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
let _ = sender.send(result);
}
- CommandMessage::UnfriendContact(jid, sender) => {
+ Command::UnfriendContact(jid, sender) => {
let presence = Stanza::Presence(stanza::client::presence::Presence {
from: None,
id: None,
@@ -787,7 +1161,7 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
match result {
Err(_) => {
let _ = sender.send(result);
@@ -805,19 +1179,15 @@ impl CommandMessage {
errors: Vec::new(),
delay: None,
});
- let result = write_handle.write(presence).await;
+ let result = connection.write_handle.write(presence).await;
let _ = sender.send(result);
}
}
}
- CommandMessage::DeleteContact(jid, sender) => {
- let owned_jid;
- {
- owned_jid = client_jid.lock().await.clone();
- }
+ Command::DeleteContact(jid, sender) => {
let iq_id = Uuid::new_v4().to_string();
let set_stanza = Stanza::Iq(Iq {
- from: Some(owned_jid),
+ from: Some(connection.jid),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -837,9 +1207,9 @@ impl CommandMessage {
});
let (send, recv) = oneshot::channel();
{
- pending_iqs.lock().await.insert(iq_id.clone(), send);
+ self.pending.lock().await.insert(iq_id.clone(), send);
}
- let result = write_handle.write(set_stanza).await;
+ let result = connection.write_handle.write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
@@ -892,11 +1262,7 @@ impl CommandMessage {
}
}
}
- CommandMessage::UpdateContact(jid, contact_update, sender) => {
- let owned_jid;
- {
- owned_jid = client_jid.lock().await.clone();
- }
+ Command::UpdateContact(jid, contact_update, sender) => {
let iq_id = Uuid::new_v4().to_string();
let groups = Vec::from_iter(
contact_update
@@ -905,7 +1271,7 @@ impl CommandMessage {
.map(|group| stanza::roster::Group(Some(group))),
);
let set_stanza = Stanza::Iq(Iq {
- from: Some(owned_jid),
+ from: Some(connection.jid),
id: iq_id.clone(),
to: None,
r#type: IqType::Set,
@@ -925,9 +1291,9 @@ impl CommandMessage {
});
let (send, recv) = oneshot::channel();
{
- pending_iqs.lock().await.insert(iq_id.clone(), send);
+ self.pending.lock().await.insert(iq_id.clone(), send);
}
- let result = write_handle.write(set_stanza).await;
+ let result = connection.write_handle.write(set_stanza).await;
if let Err(e) = result {
sender.send(Err(RosterError::Write(e)));
return;
@@ -980,16 +1346,18 @@ impl CommandMessage {
}
}
}
- CommandMessage::SetStatus(online, sender) => {
- let result = db.upsert_cached_status(online.clone()).await;
+ Command::SetStatus(online, sender) => {
+ let result = self.db.upsert_cached_status(online.clone()).await;
if let Err(e) = result {
- let _ = update_sender
+ let _ = self
+ .update_sender
.send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
e.into(),
))))
.await;
}
- let result = write_handle
+ let result = connection
+ .write_handle
.write(Stanza::Presence(online.into_stanza(None)))
.await
.map_err(|e| StatusError::Write(e));
@@ -997,15 +1365,10 @@ impl CommandMessage {
let _ = sender.send(result);
}
// TODO: offline message queue
- CommandMessage::SendMessage(jid, body, sender) => {
+ Command::SendMessage(jid, body, sender) => {
let id = Uuid::new_v4();
- let owned_jid: JID;
- {
- // TODO: timeout
- owned_jid = client_jid.lock().await.clone();
- }
let message = Stanza::Message(stanza::client::message::Message {
- from: Some(owned_jid.clone()),
+ from: Some(connection.jid.clone()),
id: Some(id.to_string()),
to: Some(jid.clone()),
// TODO: specify message type
@@ -1022,17 +1385,18 @@ impl CommandMessage {
});
let _ = sender.send(Ok(()));
// let _ = sender.send(Ok(message.clone()));
- let result = write_handle.write(message).await;
+ let result = connection.write_handle.write(message).await;
match result {
Ok(_) => {
let mut message = Message {
id,
- from: owned_jid,
+ from: connection.jid,
body,
timestamp: Utc::now(),
};
info!("send message {:?}", message);
- if let Err(e) = db
+ if let Err(e) = self
+ .db
.create_message_with_self_resource_and_chat(
message.clone(),
jid.clone(),
@@ -1041,13 +1405,16 @@ impl CommandMessage {
.map_err(|e| e.into())
{
tracing::error!("{}", e);
- let _ = update_sender.send(UpdateMessage::Error(Error::MessageSend(
- error::MessageSendError::MessageHistory(e),
- )));
+ let _ =
+ self.update_sender
+ .send(UpdateMessage::Error(Error::MessageSend(
+ error::MessageSendError::MessageHistory(e),
+ )));
}
// TODO: don't do this, have separate from from details
message.from = message.from.as_bare();
- let _ = update_sender
+ let _ = self
+ .update_sender
.send(UpdateMessage::Message { to: jid, message })
.await;
}
@@ -1056,24 +1423,279 @@ impl CommandMessage {
}
}
}
- CommandMessage::SendPresence(jid, presence, sender) => {
+ Command::SendPresence(jid, presence, sender) => {
let mut presence: stanza::client::presence::Presence = presence.into();
if let Some(jid) = jid {
presence.to = Some(jid);
};
- let result = write_handle.write(Stanza::Presence(presence)).await;
+ let result = connection
+ .write_handle
+ .write(Stanza::Presence(presence))
+ .await;
// .map_err(|e| StatusError::Write(e));
let _ = sender.send(result);
}
}
}
+
+ pub async fn handle_offline(self, command: Command) {
+ match command {
+ Command::GetRoster(sender) => {
+ let roster = self.db.read_cached_roster().await;
+ match roster {
+ Ok(roster) => {
+ let _ = sender.send(Ok(roster));
+ }
+ Err(e) => {
+ let _ = sender.send(Err(RosterError::Cache(e.into())));
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = self.db.read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = self
+ .db
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = self
+ .db
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = self.db.read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::BuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptBuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptSubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeFromContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnfriendContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::DeleteContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::UpdateContact(_jid, _contact_update, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::SetStatus(online, sender) => {
+ let result = self
+ .db
+ .upsert_cached_status(online)
+ .await
+ .map_err(|e| StatusError::Cache(e.into()));
+ sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(_jid, _body, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SendPresence(_jid, _presence, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct Connected {
+ // full jid will stay stable across reconnections
+ jid: JID,
+ write_handle: WriteHandle,
+}
+
+pub trait Logic {}
+
+pub struct Luz {
+ jid: JID,
+ password: Arc<String>,
+ receiver: mpsc::Receiver<LuzMessage>,
+ // TODO: use a dyn passwordprovider trait to avoid storing password in memory
+ connected: Option<(Connected, SupervisorHandle)>,
+ // connected_intention: bool,
+ /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
+ // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
+ // TODO: genericize
+ logic: LogicState,
+ // config: LampConfig,
+ tasks: JoinSet<()>,
+}
+
+impl Luz {
+ fn new(
+ jid: JID,
+ password: String,
+ receiver: mpsc::Receiver<LuzMessage>,
+ connected: Option<(Connected, SupervisorHandle)>,
+ connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
+ logic: LogicState,
+ ) -> Self {
+ Self {
+ jid,
+ password: Arc::new(password),
+ connected,
+ receiver,
+ connection_supervisor_shutdown,
+ logic,
+ tasks: JoinSet::new(),
+ }
+ }
+
+ async fn run(mut self) {
+ loop {
+ let msg = tokio::select! {
+ // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
+ // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
+ _ = &mut self.connection_supervisor_shutdown => {
+ self.connected = None;
+ continue;
+ }
+ Some(msg) = self.receiver.recv() => {
+ msg
+ },
+ else => break,
+ };
+ // TODO: consider separating disconnect/connect and commands apart from commandmessage
+ // TODO: dispatch commands separate tasks
+ match msg {
+ LuzMessage::Connect => {
+ match self.connected {
+ Some(_) => {
+ self.logic
+ .clone()
+ .update_sender
+ .send(UpdateMessage::Error(Error::AlreadyConnected))
+ .await;
+ }
+ None => {
+ let mut jid = self.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
+ let streams_result =
+ jabber::connect_and_login(&mut jid, &*self.password, &mut domain)
+ .await;
+ match streams_result {
+ Ok(s) => {
+ debug!("ok stream result");
+ let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
+ let (writer, supervisor) = SupervisorHandle::new(
+ s,
+ shutdown_send,
+ jid.clone(),
+ self.password.clone(),
+ self.logic.clone(),
+ );
+
+ let shutdown_recv = shutdown_recv.fuse();
+ self.connection_supervisor_shutdown = shutdown_recv;
+
+ let connected = Connected {
+ jid,
+ write_handle: writer,
+ };
+
+ self.logic.clone().handle_connect(connected.clone()).await;
+
+ self.connected = Some((connected, supervisor));
+ }
+ Err(e) => {
+ tracing::error!("error: {}", e);
+ let _ = self.logic.clone().update_sender.send(
+ UpdateMessage::Error(Error::Connecting(
+ ConnectionError::ConnectionFailed(e.into()),
+ )),
+ );
+ }
+ }
+ }
+ };
+ }
+ LuzMessage::Disconnect => match self.connected {
+ None => {
+ let _ = self
+ .logic
+ .update_sender
+ .send(UpdateMessage::Error(Error::AlreadyDisconnected))
+ .await;
+ }
+ ref mut c => {
+ if let Some((connected, supervisor_handle)) = c.take() {
+ // TODO: better disconnect logic, only reflect once actually disconnected
+ // TODO: call within supervisor instead
+ self.logic
+ .clone()
+ .handle_disconnect(connected.clone())
+ .await;
+ let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
+ } else {
+ unreachable!()
+ };
+ }
+ },
+ LuzMessage::Command(command) => {
+ match self.connected.as_ref() {
+ Some((w, s)) => self
+ .tasks
+ .spawn(self.logic.clone().handle_online(command, w.clone())),
+ None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
+ };
+ }
+ }
+ }
+ }
}
// TODO: separate sender and receiver, store handle to Luz process to ensure dropping
// #[derive(Clone)]
#[derive(Debug)]
pub struct LuzHandle {
- sender: mpsc::Sender<CommandMessage>,
+ sender: mpsc::Sender<LuzMessage>,
timeout: Duration,
}
@@ -1087,7 +1709,7 @@ impl Clone for LuzHandle {
}
impl Deref for LuzHandle {
- type Target = mpsc::Sender<CommandMessage>;
+ type Target = mpsc::Sender<LuzMessage>;
fn deref(&self) -> &Self::Target {
&self.sender
@@ -1102,371 +1724,43 @@ impl DerefMut for LuzHandle {
impl LuzHandle {
// TODO: database creation separate
- pub fn new(jid: JID, password: String, db: Db) -> (Self, mpsc::Receiver<UpdateMessage>) {
+ pub fn new(jid: JID, password: String, logic: LogicState) -> Self {
let (command_sender, command_receiver) = mpsc::channel(20);
- let (update_sender, update_receiver) = mpsc::channel(20);
+
// might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
- let (sup_send, sup_recv) = oneshot::channel();
- let mut sup_recv = sup_recv.fuse();
-
- let actor = Luz::new(
- command_sender.clone(),
- command_receiver,
- Arc::new(Mutex::new(jid)),
- password,
- Arc::new(Mutex::new(None)),
- sup_recv,
- db,
- update_sender,
- );
+ let (_sup_send, sup_recv) = oneshot::channel();
+ let sup_recv = sup_recv.fuse();
+
+ let actor = Luz::new(jid, password, command_receiver, None, sup_recv, logic);
tokio::spawn(async move { actor.run().await });
- (
- Self {
- sender: command_sender,
- // TODO: configure timeout
- timeout: Duration::from_secs(10),
- },
- update_receiver,
- )
+ Self {
+ sender: command_sender,
+ // TODO: configure timeout
+ timeout: Duration::from_secs(10),
+ }
}
pub async fn connect(&self) -> Result<(), ActorError> {
- self.send(CommandMessage::Connect).await?;
+ self.send(LuzMessage::Connect).await?;
Ok(())
}
pub async fn disconnect(&self, offline: Offline) -> Result<(), ActorError> {
- self.send(CommandMessage::Disconnect(offline)).await?;
+ self.send(LuzMessage::Disconnect).await?;
Ok(())
}
-
- pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::GetRoster(send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let roster = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(roster)
- }
-
- pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::GetChats(send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let chats = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(chats)
- }
-
- pub async fn get_chats_ordered(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::GetChatsOrdered(send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let chats = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(chats)
- }
-
- pub async fn get_chats_ordered_with_latest_messages(
- &self,
- ) -> Result<Vec<(Chat, Message)>, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::GetChatsOrderedWithLatestMessages(send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let chats = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(chats)
- }
-
- pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::GetChat(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let chat = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(chat)
- }
-
- pub async fn get_messages(
- &self,
- jid: JID,
- ) -> Result<Vec<Message>, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::GetMessages(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let messages = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(messages)
- }
-
- pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::DeleteChat(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn delete_message(&self, id: Uuid) -> Result<(), CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::DeleteMessage(id, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn get_user(&self, jid: JID) -> Result<User, CommandError<DatabaseError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::GetUser(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn add_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::AddContact(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::BuddyRequest(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::SubscriptionRequest(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::AcceptBuddyRequest(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn accept_subscription_request(
- &self,
- jid: JID,
- ) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::AcceptSubscriptionRequest(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn unsubscribe_from_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::UnsubscribeFromContact(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn unsubscribe_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::UnsubscribeContact(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn unfriend_contact(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::UnfriendContact(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn delete_contact(&self, jid: JID) -> Result<(), CommandError<RosterError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::DeleteContact(jid, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn update_contact(
- &self,
- jid: JID,
- update: ContactUpdate,
- ) -> Result<(), CommandError<RosterError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::UpdateContact(jid, update, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn set_status(&self, online: Online) -> Result<(), CommandError<StatusError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::SetStatus(online, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
-
- pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CommandMessage::SendMessage(jid, body, send))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
- }
}
// TODO: generate methods for each with a macro
-pub enum CommandMessage {
+pub enum LuzMessage {
// TODO: login invisible xep-0186
/// connect to XMPP chat server. gets roster and publishes initial presence.
Connect,
/// disconnect from XMPP chat server, sending unavailable presence then closing stream.
- Disconnect(Offline),
- /// get the roster. if offline, retreive cached version from database. should be stored in application memory
- GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
- /// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
- // TODO: paging and filtering
- GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
- // TODO: paging and filtering
- GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
- // TODO: paging and filtering
- GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
- /// get a specific chat by jid
- GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
- /// get message history for chat (does appropriate mam things)
- // TODO: paging and filtering
- GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
- /// delete a chat from your chat history, along with all the corresponding messages
- DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
- /// delete a message from your chat history
- DeleteMessage(Uuid, oneshot::Sender<Result<(), DatabaseError>>),
- /// get a user from your users database
- GetUser(JID, oneshot::Sender<Result<User, DatabaseError>>),
- /// add a contact to your roster, with a status of none, no subscriptions.
- AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
- /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
- BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
- /// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
- SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
- /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
- AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
- /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
- AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
- /// unsubscribe to a contact, but don't remove their subscription.
- UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
- /// stop a contact from being subscribed, but stay subscribed to the contact.
- UnsubscribeContact(JID, oneshot::Sender<Result<(), WriteError>>),
- /// remove subscriptions to and from contact, but keep in roster.
- UnfriendContact(JID, oneshot::Sender<Result<(), WriteError>>),
- /// remove a contact from the contact list. will remove subscriptions if not already done then delete contact from roster.
- DeleteContact(JID, oneshot::Sender<Result<(), RosterError>>),
- /// update contact. contact details will be overwritten with the contents of the contactupdate struct.
- UpdateContact(JID, ContactUpdate, oneshot::Sender<Result<(), RosterError>>),
- /// set online status. if disconnected, will be cached so when client connects, will be sent as the initial presence.
- SetStatus(Online, oneshot::Sender<Result<(), StatusError>>),
- /// send presence stanza
- // TODO: cache presence stanza
- SendPresence(
- Option<JID>,
- PresenceType,
- oneshot::Sender<Result<(), WriteError>>,
- ),
- /// send a directed presence (usually to a non-contact).
- // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
- /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
- /// chatroom). if disconnected, will be cached so when client connects, message will be sent.
- SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
+ Disconnect,
+ /// TODO: generics
+ Command(Command),
}
#[derive(Debug, Clone)]
diff --git a/luz/src/main.rs b/luz/src/main.rs
index 2bfd086..5aeef14 100644
--- a/luz/src/main.rs
+++ b/luz/src/main.rs
@@ -1,7 +1,7 @@
use std::{path::Path, str::FromStr, time::Duration};
use jid::JID;
-use luz::{db::Db, CommandMessage, LuzHandle};
+use luz::{db::Db, LuzHandle, LuzMessage};
use sqlx::SqlitePool;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
@@ -24,11 +24,11 @@ async fn main() {
}
});
- luz.send(CommandMessage::Connect).await.unwrap();
+ luz.send(LuzMessage::Connect).await.unwrap();
let (send, recv) = oneshot::channel();
tokio::time::sleep(Duration::from_secs(5)).await;
info!("sending message");
- luz.send(CommandMessage::SendMessage(
+ luz.send(LuzMessage::SendMessage(
JID::from_str("cel@blos.sm").unwrap(),
luz::chat::Body {
body: "hallo!!!".to_string(),