aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/connection/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src/connection/mod.rs')
-rw-r--r--lampada/src/connection/mod.rs52
1 files changed, 17 insertions, 35 deletions
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs
index 1e767b0..ffaa7a7 100644
--- a/lampada/src/connection/mod.rs
+++ b/lampada/src/connection/mod.rs
@@ -10,7 +10,7 @@ use std::{
use jid::JID;
use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use read::{ReadControl, ReadControlHandle, ReadState};
-use stanza::client::Stanza;
+use stanza::{client::Stanza, stream_error::Error as StreamError};
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
@@ -28,7 +28,7 @@ pub(crate) mod write;
pub struct Supervisor<Lgc> {
command_recv: mpsc::Receiver<SupervisorCommand>,
- reader_crash: oneshot::Receiver<ReadState>,
+ reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,
writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
read_control_handle: ReadControlHandle,
write_control_handle: WriteControlHandle,
@@ -43,18 +43,13 @@ pub enum SupervisorCommand {
Disconnect,
// for if there was a stream error, require to reconnect
// couldn't stream errors just cause a crash? lol
- Reconnect(ChildState),
-}
-
-pub enum ChildState {
- Write(WriteState),
- Read(ReadState),
+ Reconnect(ReadState),
}
impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
fn new(
command_recv: mpsc::Receiver<SupervisorCommand>,
- reader_crash: oneshot::Receiver<ReadState>,
+ reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,
writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
read_control_handle: ReadControlHandle,
write_control_handle: WriteControlHandle,
@@ -104,33 +99,19 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
break;
},
// TODO: Reconnect without aborting, gentle reconnect.
+ // the server sent a stream error
SupervisorCommand::Reconnect(state) => {
// TODO: please omfg
// send abort to read stream, as already done, consider
let (read_state, mut write_state);
- match state {
- ChildState::Write(receiver) => {
- write_state = receiver;
- let (send, recv) = oneshot::channel();
- let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
- // TODO: need a tokio select, in case the state arrives from somewhere else
- if let Ok(state) = recv.await {
- read_state = state;
- } else {
- break
- }
- },
- ChildState::Read(read) => {
- read_state = read;
- let (send, recv) = oneshot::channel();
- let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
- // TODO: need a tokio select, in case the state arrives from somewhere else
- if let Ok(state) = recv.await {
- write_state = state;
- } else {
- break
- }
- },
+ read_state = state;
+ let (send, recv) = oneshot::channel();
+ let _ = self.write_control_handle.send(WriteControl::Abort(None, send)).await;
+ // TODO: need a tokio select, in case the state arrives from somewhere else
+ if let Ok(state) = recv.await {
+ write_state = state;
+ } else {
+ break
}
let mut jid = self.connected.jid.clone();
@@ -175,7 +156,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
let read_state = tokio::select! {
Ok(s) = recv => s,
- Ok(s) = &mut self.reader_crash => s,
+ // TODO: is this okay
+ Ok(s) = &mut self.reader_crash => s.1,
// in case, just break as irrecoverable
else => break,
};
@@ -215,9 +197,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
},
}
},
- Ok(read_state) = &mut self.reader_crash => {
+ Ok((stream_error, read_state)) = &mut self.reader_crash => {
let (send, recv) = oneshot::channel();
- let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
+ let _ = self.write_control_handle.send(WriteControl::Abort(stream_error, send)).await;
let (retry_msg, mut write_state) = tokio::select! {
Ok(s) = recv => (None, s),
Ok(s) = &mut self.writer_crash => (Some(s.0), s.1),