aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-02-11 07:04:21 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-02-11 07:04:21 +0000
commit1ed6317272fe819e7e12b1be6fcff62d409c8f03 (patch)
tree93701e562483adca46bc9c2118fd2e641f10881e
parent41c1ba15ef5865f4513db525ed595f3ce903dd26 (diff)
downloadluz-1ed6317272fe819e7e12b1be6fcff62d409c8f03.tar.gz
luz-1ed6317272fe819e7e12b1be6fcff62d409c8f03.tar.bz2
luz-1ed6317272fe819e7e12b1be6fcff62d409c8f03.zip
supervision and reconnection
-rw-r--r--luz/src/connection/mod.rs158
-rw-r--r--luz/src/connection/read.rs183
-rw-r--r--luz/src/connection/write.rs92
-rw-r--r--luz/src/error.rs1
-rw-r--r--luz/src/lib.rs8
5 files changed, 387 insertions, 55 deletions
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs
index 3ad2648..85cf7cc 100644
--- a/luz/src/connection/mod.rs
+++ b/luz/src/connection/mod.rs
@@ -1,15 +1,22 @@
-use std::ops::{Deref, DerefMut};
+// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
+
+use std::{
+ ops::{Deref, DerefMut},
+ sync::Arc,
+ time::Duration,
+};
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
-use read::ReadControlHandle;
+use jid::JID;
+use read::{ReadControl, ReadControlHandle};
use sqlx::SqlitePool;
use tokio::{
- sync::{mpsc, oneshot},
+ sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
-use write::{WriteControlHandle, WriteHandle, WriteMessage};
+use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage};
-use crate::UpdateMessage;
+use crate::{error::Error, UpdateMessage};
mod read;
pub(crate) mod write;
@@ -21,16 +28,21 @@ pub struct Supervisor {
SqlitePool,
mpsc::Sender<UpdateMessage>,
tokio::task::JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
)>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
reader_handle: ReadControlHandle,
on_shutdown: oneshot::Sender<()>,
+ jid: Arc<Mutex<JID>>,
+ password: Arc<String>,
}
pub enum SupervisorCommand {
Disconnect,
// for if there was a stream error, require to reconnect
+ // couldn't stream errors just cause a crash? lol
Reconnect,
}
@@ -38,11 +50,19 @@ impl Supervisor {
fn new(
connection_commands: mpsc::Receiver<SupervisorCommand>,
writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
- reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ reader_crash: oneshot::Receiver<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
sender: mpsc::Sender<UpdateMessage>,
writer_handle: WriteControlHandle,
reader_handle: ReadControlHandle,
on_shutdown: oneshot::Sender<()>,
+ jid: Arc<Mutex<JID>>,
+ password: Arc<String>,
) -> Self {
Self {
connection_commands,
@@ -52,27 +72,137 @@ impl Supervisor {
reader_handle,
reader_crash,
on_shutdown,
+ jid,
+ password,
}
}
- async fn handle_command_message(&mut self, msg: SupervisorCommand) {}
-
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.connection_commands.recv() => {
- self.handle_command_message(msg).await;
+ match msg {
+ SupervisorCommand::Disconnect => {
+ let _ = self.writer_handle.send(WriteControl::Disconnect).await;
+ let _ = self.reader_handle.send(ReadControl::Disconnect).await;
+ tokio::select! {
+ _ = async { tokio::join!(
+ async { let _ = (&mut self.writer_handle.handle).await; },
+ async { let _ = (&mut self.reader_handle.handle).await; }
+ ) } => {},
+ _ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
+ (&mut self.reader_handle.handle).abort();
+ (&mut self.writer_handle.handle).abort();
+ }
+ }
+ break;
+ },
+ SupervisorCommand::Reconnect => {
+ // TODO: please omfg
+ // send abort to read stream, as already done, consider
+ todo!()
+ },
+ }
},
- error = &mut self.writer_crash => {
+ Ok((write_msg, mut write_recv)) = &mut self.writer_crash => {
+ // consider awaiting/aborting the read and write threads
+ let (send, recv) = oneshot::channel();
+ let _ = self.reader_handle.send(ReadControl::Abort(send)).await;
+ let (db, update_sender, tasks, supervisor_command, write_sender) = tokio::select! {
+ Ok(s) = recv => s,
+ Ok(s) = &mut self.reader_crash => s,
+ // in case, just break as irrecoverable
+ else => break,
+ };
+ let mut jid = self.jid.lock().await;
+ let mut domain = jid.domainpart.clone();
+ let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ match connection {
+ Ok(c) => {
+ let (read, write) = c.split();
+ let (send, recv) = oneshot::channel();
+ self.writer_crash = recv;
+ self.writer_handle =
+ WriteControlHandle::reconnect_retry(write, send, write_msg, write_recv);
+ let (send, recv) = oneshot::channel();
+ self.reader_crash = recv;
+ self.reader_handle = ReadControlHandle::reconnect(
+ read,
+ send,
+ db,
+ update_sender,
+ supervisor_command,
+ write_sender,
+ tasks
+ );
+ },
+ Err(e) => {
+ // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
+ write_recv.close();
+ let _ = write_msg.respond_to.send(Err(Error::LostConnection));
+ while let Some(msg) = write_recv.recv().await {
+ let _ = msg.respond_to.send(Err(Error::LostConnection));
+ }
+ let _ = self.sender.send(UpdateMessage::Error(e.into())).await;
+ break;
+ },
+ }
},
- error = &mut self.reader_crash => {
+ Ok((db, update_sender, tasks, supervisor_control, write_handle)) = &mut self.reader_crash => {
+ let (send, recv) = oneshot::channel();
+ let _ = self.writer_handle.send(WriteControl::Abort(send)).await;
+ let (retry_msg, mut write_receiver) = tokio::select! {
+ Ok(s) = recv => (None, s),
+ Ok(s) = &mut self.writer_crash => (Some(s.0), s.1),
+ // in case, just break as irrecoverable
+ else => break,
+ };
+ let mut jid = self.jid.lock().await;
+ let mut domain = jid.domainpart.clone();
+ let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ match connection {
+ Ok(c) => {
+ let (read, write) = c.split();
+ let (send, recv) = oneshot::channel();
+ self.writer_crash = recv;
+ if let Some(msg) = retry_msg {
+ self.writer_handle =
+ WriteControlHandle::reconnect_retry(write, send, msg, write_receiver);
+ } else {
+ self.writer_handle = WriteControlHandle::reconnect(write, send, write_receiver)
+ }
+ let (send, recv) = oneshot::channel();
+ self.reader_crash = recv;
+ self.reader_handle = ReadControlHandle::reconnect(
+ read,
+ send,
+ db,
+ update_sender,
+ supervisor_control,
+ write_handle,
+ tasks
+ );
+ },
+ Err(e) => {
+ // if reconnection failure, respond to all current messages with lost connection error.
+ write_receiver.close();
+ if let Some(msg) = retry_msg {
+ msg.respond_to.send(Err(Error::LostConnection));
+ }
+ while let Some(msg) = write_receiver.recv().await {
+ msg.respond_to.send(Err(Error::LostConnection));
+ }
+ let _ = self.sender.send(UpdateMessage::Error(e.into())).await;
+ break;
+ },
+ }
},
else => break,
}
}
- self.on_shutdown.send(());
+ let _ = self.on_shutdown.send(());
}
}
@@ -120,6 +250,8 @@ impl SupervisorHandle {
update_sender: mpsc::Sender<UpdateMessage>,
db: SqlitePool,
on_shutdown: oneshot::Sender<()>,
+ jid: Arc<Mutex<JID>>,
+ password: Arc<String>,
) -> (WriteHandle, Self) {
let (command_sender, command_receiver) = mpsc::channel(20);
let (writer_error_sender, writer_error_receiver) = oneshot::channel();
@@ -145,6 +277,8 @@ impl SupervisorHandle {
write_control_handle,
jabber_reader_control_handle,
on_shutdown,
+ jid,
+ password,
);
let handle = tokio::spawn(async move { actor.run().await });
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index 7800d56..edc6cdb 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -1,3 +1,8 @@
+use std::{
+ ops::{Deref, DerefMut},
+ time::Duration,
+};
+
use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use sqlx::SqlitePool;
use stanza::client::Stanza;
@@ -6,7 +11,7 @@ use tokio::{
task::{JoinHandle, JoinSet},
};
-use crate::UpdateMessage;
+use crate::{error::Error, UpdateMessage};
use super::{
write::{WriteHandle, WriteMessage},
@@ -17,25 +22,41 @@ pub struct Read {
// TODO: place iq hashmap here
control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ on_crash: oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
tasks: JoinSet<()>,
+ disconnecting: bool,
+ disconnect_timedout: oneshot::Receiver<()>,
}
impl Read {
fn new(
control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ on_crash: oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
db: SqlitePool,
update_sender: mpsc::Sender<UpdateMessage>,
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
supervisor_control: mpsc::Sender<SupervisorCommand>,
- write_sender: WriteHandle,
+ write_handle: WriteHandle,
+ tasks: JoinSet<()>,
) -> Self {
+ let (send, recv) = oneshot::channel();
Self {
control_receiver,
stream,
@@ -43,26 +64,73 @@ impl Read {
db,
update_sender,
supervisor_control,
- write_handle: write_sender,
- tasks: JoinSet::new(),
+ write_handle,
+ tasks,
+ disconnecting: false,
+ disconnect_timedout: recv,
}
}
async fn run(mut self) {
loop {
tokio::select! {
+ // if still haven't received the end tag in time, just kill itself
+ _ = &mut self.disconnect_timedout => {
+ break;
+ }
Some(msg) = self.control_receiver.recv() => {
match msg {
- ReadControl::Disconnect => todo!(),
- ReadControl::Abort(sender) => todo!(),
+ // when disconnect received,
+ ReadControl::Disconnect => {
+ let (send, recv) = oneshot::channel();
+ self.disconnect_timedout = recv;
+ self.disconnecting = true;
+ tokio::spawn(async {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ let _ = send.send(());
+ })
+ },
+ ReadControl::Abort(sender) => {
+ let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
+ break;
+ },
};
},
stanza = self.stream.read::<Stanza>() => {
match stanza {
- Ok(_) => todo!(),
- Err(_) => todo!(),
+ Ok(s) => {
+ self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
+ },
+ Err(e) => {
+ // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
+ // match e {
+ // peanuts::Error::ReadError(error) => todo!(),
+ // peanuts::Error::Utf8Error(utf8_error) => todo!(),
+ // peanuts::Error::ParseError(_) => todo!(),
+ // peanuts::Error::EntityProcessError(_) => todo!(),
+ // peanuts::Error::InvalidCharRef(_) => todo!(),
+ // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
+ // peanuts::Error::DuplicateAttribute(_) => todo!(),
+ // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
+ // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
+ // peanuts::Error::NotInElement(_) => todo!(),
+ // peanuts::Error::ExtraData(_) => todo!(),
+ // peanuts::Error::UndeclaredNamespace(_) => todo!(),
+ // peanuts::Error::IncorrectName(name) => todo!(),
+ // peanuts::Error::DeserializeError(_) => todo!(),
+ // peanuts::Error::Deserialize(deserialize_error) => todo!(),
+ // peanuts::Error::RootElementEnded => todo!(),
+ // }
+ // TODO: make sure this only happens when an end tag is received
+ if self.disconnecting == true {
+ break;
+ } else {
+ // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
+ let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle));
+ }
+ break;
+ },
}
- self.tasks.spawn();
},
else => break
}
@@ -70,34 +138,102 @@ impl Read {
}
}
-trait Task {
- async fn handle();
-}
-
-impl Task for Stanza {
- async fn handle() {
- todo!()
- }
+// what do stanza processes do?
+// - update ui
+// - access database
+// - disconnect proper, reconnect
+// - respond to server requests
+async fn handle_stanza(
+ stanza: Stanza,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ db: SqlitePool,
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ write_handle: WriteHandle,
+) {
+ todo!()
}
-enum ReadControl {
+pub enum ReadControl {
Disconnect,
- Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
+ Abort(
+ oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
+ ),
}
pub struct ReadControlHandle {
sender: mpsc::Sender<ReadControl>,
- handle: JoinHandle<()>,
+ pub(crate) handle: JoinHandle<()>,
+}
+
+impl Deref for ReadControlHandle {
+ type Target = mpsc::Sender<ReadControl>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for ReadControlHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
}
impl ReadControlHandle {
pub fn new(
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>,
+ on_crash: oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
+ db: SqlitePool,
+ sender: mpsc::Sender<UpdateMessage>,
+ supervisor_control: mpsc::Sender<SupervisorCommand>,
+ jabber_write: WriteHandle,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ control_receiver,
+ stream,
+ on_crash,
+ db,
+ sender,
+ supervisor_control,
+ jabber_write,
+ JoinSet::new(),
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+
+ pub fn reconnect(
+ stream: BoundJabberReader<Tls>,
+ on_crash: oneshot::Sender<(
+ SqlitePool,
+ mpsc::Sender<UpdateMessage>,
+ JoinSet<()>,
+ mpsc::Sender<SupervisorCommand>,
+ WriteHandle,
+ )>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
supervisor_control: mpsc::Sender<SupervisorCommand>,
jabber_write: WriteHandle,
+ tasks: JoinSet<()>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@@ -109,6 +245,7 @@ impl ReadControlHandle {
sender,
supervisor_control,
jabber_write,
+ tasks,
);
let handle = tokio::spawn(async move { actor.run().await });
diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs
index 9c01519..09638a8 100644
--- a/luz/src/connection/write.rs
+++ b/luz/src/connection/write.rs
@@ -19,10 +19,10 @@ pub struct Write {
pub struct WriteMessage {
stanza: Stanza,
- respond_to: oneshot::Sender<Result<(), Error>>,
+ pub respond_to: oneshot::Sender<Result<(), Error>>,
}
-enum WriteControl {
+pub enum WriteControl {
Disconnect,
Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>),
}
@@ -46,38 +46,66 @@ impl Write {
Ok(self.stream.write(stanza).await?)
}
+ async fn run_reconnected(mut self, retry_msg: WriteMessage) {
+ // try to retry sending the message that failed to send previously
+ let result = self.stream.write(&retry_msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(_error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ // TODO: upon reconnect, make sure we are not stuck in a reconnection loop
+ let _ = self.on_crash.send((retry_msg, self.stanza_receiver));
+ return;
+ }
+ _ => {
+ let _ = retry_msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ let _ = retry_msg.respond_to.send(Ok(()));
+ }
+ }
+ // return to normal loop
+ self.run().await
+ }
+
async fn run(mut self) {
loop {
tokio::select! {
Some(msg) = self.control_receiver.recv() => {
match msg {
WriteControl::Disconnect => {
- // TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send
+ // close the stanza_receiver channel and drain out all of the remaining stanzas to send
self.stanza_receiver.close();
// TODO: put this in some kind of function to avoid code duplication
- for msg in self.stanza_receiver.recv().await {
+ while let Some(msg) = self.stanza_receiver.recv().await {
let result = self.stream.write(&msg.stanza).await;
match result {
Err(e) => match &e {
- peanuts::Error::ReadError(error) => {
- // make sure message is not lost from error, supervisor handles retry and reporting
- self.on_crash.send((msg, self.stanza_receiver));
+ peanuts::Error::ReadError(_error) => {
+ // if connection lost during disconnection, just send lost connection error to the write requests
+ let _ = msg.respond_to.send(Err(Error::LostConnection));
+ while let Some(msg) = self.stanza_receiver.recv().await {
+ let _ = msg.respond_to.send(Err(Error::LostConnection));
+ }
break;
}
+ // otherwise complete sending all the stanzas currently in the queue
_ => {
- msg.respond_to.send(Err(e.into()));
+ let _ = msg.respond_to.send(Err(e.into()));
}
},
_ => {
- msg.respond_to.send(Ok(()));
+ let _ = msg.respond_to.send(Ok(()));
}
}
}
- self.stream.try_close().await;
+ let _ = self.stream.try_close().await;
break;
},
+ // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource
WriteControl::Abort(sender) => {
- sender.send(self.stanza_receiver);
+ let _ = sender.send(self.stanza_receiver);
break;
},
}
@@ -86,21 +114,20 @@ impl Write {
let result = self.stream.write(&msg.stanza).await;
match result {
Err(e) => match &e {
- peanuts::Error::ReadError(error) => {
+ peanuts::Error::ReadError(_error) => {
// make sure message is not lost from error, supervisor handles retry and reporting
- self.on_crash.send((msg, self.stanza_receiver));
+ let _ = self.on_crash.send((msg, self.stanza_receiver));
break;
}
_ => {
- msg.respond_to.send(Err(e.into()));
+ let _ = msg.respond_to.send(Err(e.into()));
}
},
_ => {
- msg.respond_to.send(Ok(()));
+ let _ = msg.respond_to.send(Ok(()));
}
}
},
- // TODO: check if this is ok to do
else => break,
}
}
@@ -128,7 +155,21 @@ impl DerefMut for WriteHandle {
pub struct WriteControlHandle {
sender: mpsc::Sender<WriteControl>,
- handle: JoinHandle<()>,
+ pub(crate) handle: JoinHandle<()>,
+}
+
+impl Deref for WriteControlHandle {
+ type Target = mpsc::Sender<WriteControl>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for WriteControlHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
}
impl WriteControlHandle {
@@ -153,6 +194,23 @@ impl WriteControlHandle {
)
}
+ pub fn reconnect_retry(
+ stream: BoundJabberWriter<Tls>,
+ supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
+ retry_msg: WriteMessage,
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor);
+ let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+
pub fn reconnect(
stream: BoundJabberWriter<Tls>,
supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>,
diff --git a/luz/src/error.rs b/luz/src/error.rs
index d9dfaba..2809e8d 100644
--- a/luz/src/error.rs
+++ b/luz/src/error.rs
@@ -5,6 +5,7 @@ pub enum Error {
SQL(sqlx::Error),
JID(jid::ParseError),
AlreadyDisconnected,
+ LostConnection,
}
impl From<peanuts::Error> for Error {
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index 0dfc30c..333d8eb 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -20,7 +20,7 @@ pub struct Luz {
receiver: mpsc::Receiver<CommandMessage>,
jid: Arc<Mutex<JID>>,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
- password: String,
+ password: Arc<String>,
connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,
db: SqlitePool,
sender: mpsc::Sender<UpdateMessage>,
@@ -43,7 +43,7 @@ impl Luz {
) -> Self {
Self {
jid,
- password,
+ password: Arc::new(password),
connected,
db,
receiver,
@@ -75,7 +75,7 @@ impl Luz {
let mut domain = jid.domainpart.clone();
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
let streams_result =
- jabber::connect_and_login(&mut jid, &self.password, &mut domain)
+ jabber::connect_and_login(&mut jid, &*self.password, &mut domain)
.await;
match streams_result {
Ok(s) => {
@@ -85,6 +85,8 @@ impl Luz {
self.sender.clone(),
self.db.clone(),
shutdown_send,
+ self.jid.clone(),
+ self.password.clone(),
);
self.connection_supervisor_shutdown = shutdown_recv;
*self.connected.lock().await = Some((writer, supervisor));