diff options
Diffstat (limited to 'lampada/src/connection/mod.rs')
-rw-r--r-- | lampada/src/connection/mod.rs | 52 |
1 files changed, 17 insertions, 35 deletions
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index 1e767b0..ffaa7a7 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -10,7 +10,7 @@ use std::{ use jid::JID; use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; use read::{ReadControl, ReadControlHandle, ReadState}; -use stanza::client::Stanza; +use stanza::{client::Stanza, stream_error::Error as StreamError}; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, @@ -28,7 +28,7 @@ pub(crate) mod write; pub struct Supervisor<Lgc> { command_recv: mpsc::Receiver<SupervisorCommand>, - reader_crash: oneshot::Receiver<ReadState>, + reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>, writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, read_control_handle: ReadControlHandle, write_control_handle: WriteControlHandle, @@ -43,18 +43,13 @@ pub enum SupervisorCommand { Disconnect, // for if there was a stream error, require to reconnect // couldn't stream errors just cause a crash? lol - Reconnect(ChildState), -} - -pub enum ChildState { - Write(WriteState), - Read(ReadState), + Reconnect(ReadState), } impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { fn new( command_recv: mpsc::Receiver<SupervisorCommand>, - reader_crash: oneshot::Receiver<ReadState>, + reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>, writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, read_control_handle: ReadControlHandle, write_control_handle: WriteControlHandle, @@ -104,33 +99,19 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { break; }, // TODO: Reconnect without aborting, gentle reconnect. + // the server sent a stream error SupervisorCommand::Reconnect(state) => { // TODO: please omfg // send abort to read stream, as already done, consider let (read_state, mut write_state); - match state { - ChildState::Write(receiver) => { - write_state = receiver; - let (send, recv) = oneshot::channel(); - let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; - // TODO: need a tokio select, in case the state arrives from somewhere else - if let Ok(state) = recv.await { - read_state = state; - } else { - break - } - }, - ChildState::Read(read) => { - read_state = read; - let (send, recv) = oneshot::channel(); - let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; - // TODO: need a tokio select, in case the state arrives from somewhere else - if let Ok(state) = recv.await { - write_state = state; - } else { - break - } - }, + read_state = state; + let (send, recv) = oneshot::channel(); + let _ = self.write_control_handle.send(WriteControl::Abort(None, send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else + if let Ok(state) = recv.await { + write_state = state; + } else { + break } let mut jid = self.connected.jid.clone(); @@ -175,7 +156,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; let read_state = tokio::select! { Ok(s) = recv => s, - Ok(s) = &mut self.reader_crash => s, + // TODO: is this okay + Ok(s) = &mut self.reader_crash => s.1, // in case, just break as irrecoverable else => break, }; @@ -215,9 +197,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { }, } }, - Ok(read_state) = &mut self.reader_crash => { + Ok((stream_error, read_state)) = &mut self.reader_crash => { let (send, recv) = oneshot::channel(); - let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; + let _ = self.write_control_handle.send(WriteControl::Abort(stream_error, send)).await; let (retry_msg, mut write_state) = tokio::select! { Ok(s) = recv => (None, s), Ok(s) = &mut self.writer_crash => (Some(s.0), s.1), |