aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@blos.sm>2025-02-10 17:48:39 +0000
committerLibravatar cel 🌸 <cel@blos.sm>2025-02-10 17:48:39 +0000
commit41c1ba15ef5865f4513db525ed595f3ce903dd26 (patch)
tree385a6f758dbf0837e82dee9e8b011db57a93425b
parenta1d96233e816e2b8378a629c6cc9e34028f2435b (diff)
downloadluz-41c1ba15ef5865f4513db525ed595f3ce903dd26.tar.gz
luz-41c1ba15ef5865f4513db525ed595f3ce903dd26.tar.bz2
luz-41c1ba15ef5865f4513db525ed595f3ce903dd26.zip
WIP: code cleanup
-rw-r--r--luz/src/connection/mod.rs166
-rw-r--r--luz/src/connection/read.rs120
-rw-r--r--luz/src/connection/write.rs171
-rw-r--r--luz/src/error.rs32
-rw-r--r--luz/src/lib.rs577
5 files changed, 584 insertions, 482 deletions
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs
new file mode 100644
index 0000000..3ad2648
--- /dev/null
+++ b/luz/src/connection/mod.rs
@@ -0,0 +1,166 @@
+use std::ops::{Deref, DerefMut};
+
+use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
+use read::ReadControlHandle;
+use sqlx::SqlitePool;
+use tokio::{
+ sync::{mpsc, oneshot},
+ task::{JoinHandle, JoinSet},
+};
+use write::{WriteControlHandle, WriteHandle, WriteMessage};
+
+use crate::UpdateMessage;
+
+mod read;
+pub(crate) mod write;
+
+pub struct Supervisor {
+ connection_commands: mpsc::Receiver<SupervisorCommand>,
+ writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ reader_crash: oneshot::Receiver<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ tokio::task::JoinSet<()>,
+ )>,
+ sender: mpsc::Sender<UpdateMessage>,
+ writer_handle: WriteControlHandle,
+ reader_handle: ReadControlHandle,
+ on_shutdown: oneshot::Sender<()>,
+}
+
+pub enum SupervisorCommand {
+ Disconnect,
+ // for if there was a stream error, require to reconnect
+ Reconnect,
+}
+
+impl Supervisor {
+ fn new(
+ connection_commands: mpsc::Receiver<SupervisorCommand>,
+ writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ sender: mpsc::Sender<UpdateMessage>,
+ writer_handle: WriteControlHandle,
+ reader_handle: ReadControlHandle,
+ on_shutdown: oneshot::Sender<()>,
+ ) -> Self {
+ Self {
+ connection_commands,
+ writer_crash,
+ sender,
+ writer_handle,
+ reader_handle,
+ reader_crash,
+ on_shutdown,
+ }
+ }
+
+ async fn handle_command_message(&mut self, msg: SupervisorCommand) {}
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.connection_commands.recv() => {
+ self.handle_command_message(msg).await;
+ },
+ error = &mut self.writer_crash => {
+
+ },
+ error = &mut self.reader_crash => {
+
+ },
+ else => break,
+ }
+ }
+ self.on_shutdown.send(());
+ }
+}
+
+pub struct SupervisorHandle {
+ sender: SupervisorSender,
+ handle: JoinHandle<()>,
+}
+
+impl Deref for SupervisorHandle {
+ type Target = SupervisorSender;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for SupervisorHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+#[derive(Clone)]
+pub struct SupervisorSender {
+ sender: mpsc::Sender<SupervisorCommand>,
+}
+
+impl Deref for SupervisorSender {
+ type Target = mpsc::Sender<SupervisorCommand>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for SupervisorSender {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl SupervisorHandle {
+ pub fn new(
+ streams: BoundJabberStream<Tls>,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ db: SqlitePool,
+ on_shutdown: oneshot::Sender<()>,
+ ) -> (WriteHandle, Self) {
+ let (command_sender, command_receiver) = mpsc::channel(20);
+ let (writer_error_sender, writer_error_receiver) = oneshot::channel();
+ let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
+
+ let (reader, writer) = streams.split();
+ let (write_handle, write_control_handle) =
+ WriteControlHandle::new(writer, writer_error_sender);
+ let jabber_reader_control_handle = ReadControlHandle::new(
+ reader,
+ reader_crash_sender,
+ db,
+ update_sender.clone(),
+ command_sender.clone(),
+ write_handle.clone(),
+ );
+
+ let actor = Supervisor::new(
+ command_receiver,
+ writer_error_receiver,
+ reader_crash_receiver,
+ update_sender,
+ write_control_handle,
+ jabber_reader_control_handle,
+ on_shutdown,
+ );
+
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ (
+ write_handle,
+ Self {
+ sender: SupervisorSender {
+ sender: command_sender,
+ },
+ handle,
+ },
+ )
+ }
+
+ pub fn sender(&self) -> SupervisorSender {
+ self.sender.clone()
+ }
+}
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
new file mode 100644
index 0000000..7800d56
--- /dev/null
+++ b/luz/src/connection/read.rs
@@ -0,0 +1,120 @@
+use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use sqlx::SqlitePool;
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot},
+ task::{JoinHandle, JoinSet},
+};
+
+use crate::UpdateMessage;
+
+use super::{
+ write::{WriteHandle, WriteMessage},
+ SupervisorCommand,
+};
+
+pub struct Read {
+ // TODO: place iq hashmap here
+ control_receiver: mpsc::Receiver<ReadControl>,
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ write_handle: WriteHandle,
+ tasks: JoinSet<()>,
+}
+
+impl Read {
+ fn new(
+ control_receiver: mpsc::Receiver<ReadControl>,
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ write_sender: WriteHandle,
+ ) -> Self {
+ Self {
+ control_receiver,
+ stream,
+ on_crash,
+ db,
+ update_sender,
+ supervisor_control,
+ write_handle: write_sender,
+ tasks: JoinSet::new(),
+ }
+ }
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ ReadControl::Disconnect => todo!(),
+ ReadControl::Abort(sender) => todo!(),
+ };
+ },
+ stanza = self.stream.read::<Stanza>() => {
+ match stanza {
+ Ok(_) => todo!(),
+ Err(_) => todo!(),
+ }
+ self.tasks.spawn();
+ },
+ else => break
+ }
+ }
+ }
+}
+
+trait Task {
+ async fn handle();
+}
+
+impl Task for Stanza {
+ async fn handle() {
+ todo!()
+ }
+}
+
+enum ReadControl {
+ Disconnect,
+ Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
+}
+
+pub struct ReadControlHandle {
+ sender: mpsc::Sender<ReadControl>,
+ handle: JoinHandle<()>,
+}
+
+impl ReadControlHandle {
+ pub fn new(
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ jabber_write: WriteHandle,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ control_receiver,
+ stream,
+ on_crash,
+ db,
+ sender,
+ supervisor_control,
+ jabber_write,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}
diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs
new file mode 100644
index 0000000..9c01519
--- /dev/null
+++ b/luz/src/connection/write.rs
@@ -0,0 +1,171 @@
+use std::ops::{Deref, DerefMut};
+
+use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot},
+ task::JoinHandle,
+};
+
+use crate::error::Error;
+
+// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
+pub struct Write {
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ control_receiver: mpsc::Receiver<WriteControl>,
+ stream: BoundJabberWriter<Tls>,
+ on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+}
+
+pub struct WriteMessage {
+ stanza: Stanza,
+ respond_to: oneshot::Sender<Result<(), Error>>,
+}
+
+enum WriteControl {
+ Disconnect,
+ Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
+}
+
+impl Write {
+ fn new(
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ control_receiver: mpsc::Receiver<WriteControl>,
+ stream: BoundJabberWriter<Tls>,
+ supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ ) -> Self {
+ Self {
+ stanza_receiver,
+ control_receiver,
+ stream,
+ on_crash: supervisor,
+ }
+ }
+
+ async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
+ Ok(self.stream.write(stanza).await?)
+ }
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ WriteControl::Disconnect => {
+ // TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
+ self.stanza_receiver.close();
+ // TODO: put this in some kind of function to avoid code duplication
+ for msg in self.stanza_receiver.recv().await {
+ let result = self.stream.write(&msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ self.on_crash.send((msg, self.stanza_receiver));
+ break;
+ }
+ _ => {
+ msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ msg.respond_to.send(Ok(()));
+ }
+ }
+ }
+ self.stream.try_close().await;
+ break;
+ },
+ WriteControl::Abort(sender) => {
+ sender.send(self.stanza_receiver);
+ break;
+ },
+ }
+ },
+ Some(msg) = self.stanza_receiver.recv() => {
+ let result = self.stream.write(&msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ self.on_crash.send((msg, self.stanza_receiver));
+ break;
+ }
+ _ => {
+ msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ msg.respond_to.send(Ok(()));
+ }
+ }
+ },
+ // TODO: check if this is ok to do
+ else => break,
+ }
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct WriteHandle {
+ sender: mpsc::Sender<WriteMessage>,
+}
+
+impl Deref for WriteHandle {
+ type Target = mpsc::Sender<WriteMessage>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for WriteHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+pub struct WriteControlHandle {
+ sender: mpsc::Sender<WriteControl>,
+ handle: JoinHandle<()>,
+}
+
+impl WriteControlHandle {
+ pub fn new(
+ stream: BoundJabberWriter<Tls>,
+ supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ ) -> (WriteHandle, Self) {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+ let (stanza_sender, stanza_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ (
+ WriteHandle {
+ sender: stanza_sender,
+ },
+ Self {
+ sender: control_sender,
+ handle,
+ },
+ )
+ }
+
+ pub fn reconnect(
+ stream: BoundJabberWriter<Tls>,
+ supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}
diff --git a/luz/src/error.rs b/luz/src/error.rs
new file mode 100644
index 0000000..d9dfaba
--- /dev/null
+++ b/luz/src/error.rs
@@ -0,0 +1,32 @@
+pub enum Error {
+ AlreadyConnected,
+ Jabber(jabber::Error),
+ XML(peanuts::Error),
+ SQL(sqlx::Error),
+ JID(jid::ParseError),
+ AlreadyDisconnected,
+}
+
+impl From<peanuts::Error> for Error {
+ fn from(e: peanuts::Error) -> Self {
+ Self::XML(e)
+ }
+}
+
+impl From<jid::ParseError> for Error {
+ fn from(e: jid::ParseError) -> Self {
+ Self::JID(e)
+ }
+}
+
+impl From<sqlx::Error> for Error {
+ fn from(e: sqlx::Error) -> Self {
+ Self::SQL(e)
+ }
+}
+
+impl From<jabber::Error> for Error {
+ fn from(e: jabber::Error) -> Self {
+ Self::Jabber(e)
+ }
+}
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 6372fe0..0dfc30c 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -1,301 +1,33 @@
-use std::{
- collections::{HashMap, HashSet, VecDeque},
- fmt::Pointer,
- pin::pin,
- sync::{atomic::AtomicBool, Arc},
- task::{ready, Poll},
-};
+use std::sync::Arc;
-use futures::{
- stream::{SplitSink, SplitStream},
- Sink, SinkExt, Stream, StreamExt,
-};
-use jabber::{
- connection::Tls,
- jabber_stream::bound_stream::{BoundJabberReader, BoundJabberStream, BoundJabberWriter},
- JID,
-};
-use sqlx::{query, Pool, Sqlite, SqlitePool};
-use stanza::{
- client::{
- iq::{Iq, IqType, Query},
- Stanza,
- },
- roster,
-};
+use connection::SupervisorSender;
+use jabber::JID;
+use sqlx::SqlitePool;
+use stanza::roster;
use tokio::{
- io::AsyncRead,
- select,
- sync::{
- mpsc::{self, Receiver, Sender},
- oneshot, Mutex,
- },
- task::{JoinHandle, JoinSet},
+ sync::{mpsc, oneshot, Mutex},
+ task::JoinSet,
};
-use tokio_stream::wrappers::ReceiverStream;
-use tokio_util::sync::{PollSendError, PollSender};
-
-// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
-pub struct JabberWriter {
- stanza_receiver: mpsc::Receiver<JabberWrite>,
- control_receiver: mpsc::Receiver<JabberWriterControl>,
- stream: BoundJabberWriter<Tls>,
- on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
-}
-
-struct JabberWrite {
- stanza: Stanza,
- respond_to: oneshot::Sender<Result<(), Error>>,
-}
-
-enum JabberWriterControl {
- Disconnect,
- Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
-}
-
-impl JabberWriter {
- fn new(
- stanza_receiver: mpsc::Receiver<JabberWrite>,
- control_receiver: mpsc::Receiver<JabberWriterControl>,
- stream: BoundJabberWriter<Tls>,
- supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
- ) -> Self {
- Self {
- stanza_receiver,
- control_receiver,
- stream,
- on_crash: supervisor,
- }
- }
-
- async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
- Ok(self.stream.write(stanza).await?)
- }
-
- async fn run(mut self) {
- loop {
- tokio::select! {
- Some(msg) = self.control_receiver.recv() => {
- match msg {
- JabberWriterControl::Disconnect => {
- // TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
- self.stanza_receiver.close();
- // TODO: put this in some kind of function to avoid code duplication
- for msg in self.stanza_receiver.recv().await {
- let result = self.stream.write(&msg.stanza).await;
- match result {
- Err(e) => match &e {
- peanuts::Error::ReadError(error) => {
- // make sure message is not lost from error, supervisor handles retry and reporting
- self.on_crash.send((msg, self.stanza_receiver));
- break;
- }
- _ => {
- msg.respond_to.send(Err(e.into()));
- }
- },
- _ => {
- msg.respond_to.send(Ok(()));
- }
- }
- }
- self.stream.try_close().await;
- break;
- },
- JabberWriterControl::Abort(sender) => {
- sender.send(self.stanza_receiver);
- break;
- },
- }
- },
- Some(msg) = self.stanza_receiver.recv() => {
- let result = self.stream.write(&msg.stanza).await;
- match result {
- Err(e) => match &e {
- peanuts::Error::ReadError(error) => {
- // make sure message is not lost from error, supervisor handles retry and reporting
- self.on_crash.send((msg, self.stanza_receiver));
- break;
- }
- _ => {
- msg.respond_to.send(Err(e.into()));
- }
- },
- _ => {
- msg.respond_to.send(Ok(()));
- }
- }
- },
- // TODO: check if this is ok to do
- else => break,
- }
- }
- }
-}
-
-#[derive(Clone)]
-pub struct JabberWriteHandle {
- sender: mpsc::Sender<JabberWrite>,
-}
-
-pub struct JabberWriterControlHandle {
- sender: mpsc::Sender<JabberWriterControl>,
- handle: JoinHandle<()>,
-}
-
-impl JabberWriterControlHandle {
- pub fn new(
- stream: BoundJabberWriter<Tls>,
- supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
- ) -> (JabberWriteHandle, JabberWriterControlHandle) {
- let (control_sender, control_receiver) = mpsc::channel(20);
- let (stanza_sender, stanza_receiver) = mpsc::channel(20);
-
- let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
- let handle = tokio::spawn(async move { actor.run().await });
-
- (
- JabberWriteHandle {
- sender: stanza_sender,
- },
- Self {
- sender: control_sender,
- handle,
- },
- )
- }
-
- pub fn reconnect(
- stream: BoundJabberWriter<Tls>,
- supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
- stanza_receiver: mpsc::Receiver<JabberWrite>,
- ) -> Self {
- let (control_sender, control_receiver) = mpsc::channel(20);
-
- let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor);
- let handle = tokio::spawn(async move { actor.run().await });
-
- Self {
- sender: control_sender,
- handle,
- }
- }
-}
-
-pub struct JabberReader {
- // TODO: place iq hashmap here
- control_receiver: mpsc::Receiver<JabberReaderControl>,
- stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
- db: SqlitePool,
- sender: mpsc::Sender<UpdateMessage>,
- supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
- write_sender: mpsc::Sender<JabberWrite>,
- tasks: JoinSet<()>,
-}
-
-impl JabberReader {
- fn new(
- control_receiver: mpsc::Receiver<JabberReaderControl>,
- stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
- db: SqlitePool,
- sender: mpsc::Sender<UpdateMessage>,
- // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
- supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
- write_sender: mpsc::Sender<JabberWrite>,
- ) -> Self {
- Self {
- control_receiver,
- stream,
- on_crash,
- db,
- sender,
- supervisor_control,
- write_sender,
- tasks: JoinSet::new(),
- }
- }
-
- async fn run(mut self) {
- loop {
- tokio::select! {
- Some(msg) = self.control_receiver.recv() => {
- match msg {
- JabberReaderControl::Disconnect => todo!(),
- JabberReaderControl::Abort(sender) => todo!(),
- };
- },
- stanza = self.stream.read::<Stanza>() => {
- match stanza {
- Ok(_) => todo!(),
- Err(_) => todo!(),
- }
- self.tasks.spawn();
- },
- else => break
- }
- }
- }
-}
-
-trait Task {
- async fn handle();
-}
-
-impl Task for Stanza {
- async fn handle() {
- todo!()
- }
-}
-
-enum JabberReaderControl {
- Disconnect,
- Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>),
-}
-
-struct JabberReaderControlHandle {
- sender: mpsc::Sender<JabberReaderControl>,
- handle: JoinHandle<()>,
-}
-
-impl JabberReaderControlHandle {
- pub fn new(
- stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
- db: SqlitePool,
- sender: mpsc::Sender<UpdateMessage>,
- supervisor_control: mpsc::Sender<JabberSupervisorCommand>,
- jabber_write: mpsc::Sender<JabberWrite>,
- ) -> Self {
- let (control_sender, control_receiver) = mpsc::channel(20);
- let actor = JabberReader::new(
- control_receiver,
- stream,
- on_crash,
- db,
- sender,
- supervisor_control,
- jabber_write,
- );
- let handle = tokio::spawn(async move { actor.run().await });
+use crate::connection::write::WriteHandle;
+use crate::connection::{SupervisorCommand, SupervisorHandle};
+use crate::error::Error;
- Self {
- sender: control_sender,
- handle,
- }
- }
-}
+mod connection;
+mod error;
pub struct Luz {
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: String,
- connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
+ connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
+ /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
+ connection_supervisor_shutdown: oneshot::Receiver<()>,
+ // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
+ // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
tasks: JoinSet<()>,
}
@@ -304,7 +36,8 @@ impl Luz {
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
password: String,
- connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>,
+ connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
+ connection_supervisor_shutdown: oneshot::Receiver<()>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
) -> Self {
@@ -316,71 +49,87 @@ impl Luz {
receiver,
sender,
tasks: JoinSet::new(),
+ connection_supervisor_shutdown,
}
}
async fn run(mut self) {
- while let Some(msg) = self.receiver.recv().await {
- // TODO: consider separating disconnect/connect and commands apart from commandmessage
- match msg {
- CommandMessage::Connect => {
- match self.connected.lock().await.as_ref() {
- Some(_) => {
- self.sender
- .send(UpdateMessage::Error(Error::AlreadyConnected))
- .await;
- }
- None => {
- let mut jid = self.jid.lock().await;
- let mut domain = jid.domainpart.clone();
- let streams_result =
- jabber::connect_and_login(&mut jid, &self.password, &mut domain)
- .await;
- match streams_result {
- Ok(s) => {
- let (writer, supervisor) =
- JabberSupervisorHandle::new(s, self.sender.clone());
- *self.connected.lock().await = Some((writer, supervisor));
+ loop {
+ tokio::select! {
+ _ = &mut self.connection_supervisor_shutdown => {
+ *self.connected.lock().await = None
+ }
+ Some(msg) = self.receiver.recv() => {
+ // TODO: consider separating disconnect/connect and commands apart from commandmessage
+ // TODO: dispatch commands separate tasks
+ match msg {
+ CommandMessage::Connect => {
+ match self.connected.lock().await.as_ref() {
+ Some(_) => {
+ self.sender
+ .send(UpdateMessage::Error(Error::AlreadyConnected))
+ .await;
}
- Err(e) => {
- self.sender.send(UpdateMessage::Error(e.into()));
+ None => {
+ let mut jid = self.jid.lock().await;
+ let mut domain = jid.domainpart.clone();
+ // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
+ let streams_result =
+ jabber::connect_and_login(&mut jid, &self.password, &mut domain)
+ .await;
+ match streams_result {
+ Ok(s) => {
+ let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
+ let (writer, supervisor) = SupervisorHandle::new(
+ s,
+ self.sender.clone(),
+ self.db.clone(),
+ shutdown_send,
+ );
+ self.connection_supervisor_shutdown = shutdown_recv;
+ *self.connected.lock().await = Some((writer, supervisor));
+ }
+ Err(e) => {
+ self.sender.send(UpdateMessage::Error(e.into()));
+ }
+ }
}
+ };
+ }
+ CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
+ None => {
+ self.sender
+ .send(UpdateMessage::Error(Error::AlreadyDisconnected))
+ .await;
+ }
+ mut c => {
+ if let Some((_write_handle, supervisor_handle)) = c.take() {
+ let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
+ } else {
+ unreachable!()
+ };
}
+ },
+ _ => {
+ match self.connected.lock().await.as_ref() {
+ Some((w, s)) => self.tasks.spawn(msg.handle_online(
+ w.clone(),
+ s.sender(),
+ self.jid.clone(),
+ self.db.clone(),
+ self.sender.clone(),
+ // TODO: iq hashmap
+ )),
+ None => self.tasks.spawn(msg.handle_offline(
+ self.jid.clone(),
+ self.db.clone(),
+ self.sender.clone(),
+ )),
+ };
}
- };
- }
- CommandMessage::Disconnect => match self.connected.lock().await.as_mut() {
- None => {
- self.sender
- .send(UpdateMessage::Error(Error::AlreadyDisonnected))
- .await;
- }
- mut c => {
- if let Some((_write_handle, supervisor_handle)) = c.take() {
- let _ = supervisor_handle
- .sender
- .send(JabberSupervisorCommand::Disconnect)
- .await;
- } else {
- unreachable!()
- };
}
},
- _ => {
- match self.connected.lock().await.as_ref() {
- Some((w, _)) => self.tasks.spawn(msg.handle_online(
- w.clone(),
- self.jid.clone(),
- self.db.clone(),
- self.sender.clone(),
- )),
- None => self.tasks.spawn(msg.handle_offline(
- self.jid.clone(),
- self.db.clone(),
- self.sender.clone(),
- )),
- };
- }
+ else => break,
}
}
}
@@ -398,7 +147,8 @@ impl CommandMessage {
pub async fn handle_online(
mut self,
- jabber_write_handle: JabberWriteHandle,
+ write_handle: WriteHandle,
+ supervisor_control: SupervisorSender,
// TODO: jid could lose resource by the end
jid: Arc<Mutex<JID>>,
db: SqlitePool,
@@ -418,12 +168,15 @@ impl LuzHandle {
pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {
let (command_sender, command_receiver) = mpsc::channel(20);
let (update_sender, update_receiver) = mpsc::channel(20);
+ // might be bad, first supervisor shutdown notification oneshot is never used (disgusting)
+ let (sup_send, sup_recv) = oneshot::channel();
let actor = Luz::new(
command_receiver,
Arc::new(Mutex::new(jid)),
password,
Arc::new(Mutex::new(None)),
+ sup_recv,
db,
update_sender,
);
@@ -436,146 +189,6 @@ impl LuzHandle {
}
}
-pub struct JabberSupervisor {
- connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
- writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
- reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
- sender: mpsc::Sender<UpdateMessage>,
- writer_handle: JabberWriterControlHandle,
- reader_handle: JabberReaderControlHandle,
-}
-
-pub enum JabberSupervisorCommand {
- Disconnect,
-}
-
-impl JabberSupervisor {
- fn new(
- connection_commands: mpsc::Receiver<JabberSupervisorCommand>,
- writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>,
- reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
- sender: mpsc::Sender<UpdateMessage>,
- writer_handle: JabberWriterControlHandle,
- reader_handle: JabberReaderControlHandle,
- ) -> Self {
- Self {
- connection_commands,
- writer_crash,
- sender,
- writer_handle,
- reader_handle,
- reader_crash,
- }
- }
-
- async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {}
-
- async fn run(mut self) {
- loop {
- tokio::select! {
- Some(msg) = self.connection_commands.recv() => {
- self.handle_command_message(msg).await;
- },
- error = self.writer_crash => {
-
- },
- error = self.reader_crash => {
-
- },
- }
- }
- }
-}
-
-pub struct JabberSupervisorHandle {
- sender: mpsc::Sender<JabberSupervisorCommand>,
- handle: JoinHandle<()>,
-}
-
-impl JabberSupervisorHandle {
- pub fn new(
- streams: BoundJabberStream<Tls>,
- sender: mpsc::Sender<UpdateMessage>,
- db: SqlitePool,
- update_sender: mpsc::Sender<UpdateMessage>,
- ) -> (JabberWriteHandle, Self) {
- let (command_sender, command_receiver) = mpsc::channel(20);
- let (writer_error_sender, writer_error_receiver) = oneshot::channel();
- let (reader_crash_sender, reader_crash_receiver) = oneshot::channel();
-
- let (reader, writer) = streams.split();
- let (jabber_write_handle, jabber_writer_control_handle) =
- JabberWriterControlHandle::new(writer, writer_error_sender);
- let jabber_reader_control_handle = JabberReaderControlHandle::new(
- reader,
- reader_crash_sender,
- db,
- update_sender,
- command_sender.clone(),
- jabber_write_handle.sender.clone(),
- );
-
- let actor = JabberSupervisor::new(
- command_receiver,
- writer_error_receiver,
- reader_crash_receiver,
- sender,
- jabber_writer_control_handle,
- jabber_reader_control_handle,
- );
-
- let handle = tokio::spawn(async move { actor.run().await });
-
- (
- jabber_write_handle,
- Self {
- sender: command_sender,
- handle,
- },
- )
- }
-}
-
-pub enum Error {
- AlreadyConnected,
- PollSend(PollSendError<CommandMessage>),
- Jabber(jabber::Error),
- XML(peanuts::Error),
- SQL(sqlx::Error),
- JID(jid::ParseError),
- AlreadyDisonnected,
-}
-
-impl From<peanuts::Error> for Error {
- fn from(e: peanuts::Error) -> Self {
- Self::XML(e)
- }
-}
-
-impl From<jid::ParseError> for Error {
- fn from(e: jid::ParseError) -> Self {
- Self::JID(e)
- }
-}
-
-impl From<sqlx::Error> for Error {
- fn from(e: sqlx::Error) -> Self {
- Self::SQL(e)
- }
-}
-
-impl From<jabber::Error> for Error {
- fn from(e: jabber::Error) -> Self {
- Self::Jabber(e)
- }
-}
-
-impl From<PollSendError<CommandMessage>> for Error {
- fn from(e: PollSendError<CommandMessage>) -> Self {
- Self::PollSend(e)
- }
-}
-
pub enum CommandMessage {
Connect,
Disconnect,