aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/connection
diff options
context:
space:
mode:
Diffstat (limited to 'lampada/src/connection')
-rw-r--r--lampada/src/connection/mod.rs374
-rw-r--r--lampada/src/connection/read.rs233
-rw-r--r--lampada/src/connection/write.rs258
3 files changed, 865 insertions, 0 deletions
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs
new file mode 100644
index 0000000..1e767b0
--- /dev/null
+++ b/lampada/src/connection/mod.rs
@@ -0,0 +1,374 @@
+// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
+
+use std::{
+ collections::HashMap,
+ ops::{Deref, DerefMut},
+ sync::Arc,
+ time::Duration,
+};
+
+use jid::JID;
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
+use read::{ReadControl, ReadControlHandle, ReadState};
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::{JoinHandle, JoinSet},
+};
+use tracing::info;
+use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
+
+use crate::{
+ error::{ConnectionError, WriteError},
+ Connected, Logic,
+};
+
+mod read;
+pub(crate) mod write;
+
+pub struct Supervisor<Lgc> {
+ command_recv: mpsc::Receiver<SupervisorCommand>,
+ reader_crash: oneshot::Receiver<ReadState>,
+ writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
+ read_control_handle: ReadControlHandle,
+ write_control_handle: WriteControlHandle,
+ on_crash: oneshot::Sender<()>,
+ // jid in connected stays the same over the life of the supervisor (the connection session)
+ connected: Connected,
+ password: Arc<String>,
+ logic: Lgc,
+}
+
+pub enum SupervisorCommand {
+ Disconnect,
+ // for if there was a stream error, require to reconnect
+ // couldn't stream errors just cause a crash? lol
+ Reconnect(ChildState),
+}
+
+pub enum ChildState {
+ Write(WriteState),
+ Read(ReadState),
+}
+
+impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
+ fn new(
+ command_recv: mpsc::Receiver<SupervisorCommand>,
+ reader_crash: oneshot::Receiver<ReadState>,
+ writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
+ read_control_handle: ReadControlHandle,
+ write_control_handle: WriteControlHandle,
+ on_crash: oneshot::Sender<()>,
+ connected: Connected,
+ password: Arc<String>,
+ logic: Lgc,
+ ) -> Self {
+ Self {
+ command_recv,
+ reader_crash,
+ writer_crash,
+ read_control_handle,
+ write_control_handle,
+ on_crash,
+ connected,
+ password,
+ logic,
+ }
+ }
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.command_recv.recv() => {
+ match msg {
+ SupervisorCommand::Disconnect => {
+ info!("disconnecting");
+ self.logic
+ .handle_disconnect(self.connected.clone())
+ .await;
+ let _ = self.write_control_handle.send(WriteControl::Disconnect).await;
+ let _ = self.read_control_handle.send(ReadControl::Disconnect).await;
+ info!("sent disconnect command");
+ tokio::select! {
+ _ = async { tokio::join!(
+ async { let _ = (&mut self.write_control_handle.handle).await; },
+ async { let _ = (&mut self.read_control_handle.handle).await; }
+ ) } => {},
+ // TODO: config timeout
+ _ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
+ (&mut self.read_control_handle.handle).abort();
+ (&mut self.write_control_handle.handle).abort();
+ }
+ }
+ info!("disconnected");
+ break;
+ },
+ // TODO: Reconnect without aborting, gentle reconnect.
+ SupervisorCommand::Reconnect(state) => {
+ // TODO: please omfg
+ // send abort to read stream, as already done, consider
+ let (read_state, mut write_state);
+ match state {
+ ChildState::Write(receiver) => {
+ write_state = receiver;
+ let (send, recv) = oneshot::channel();
+ let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
+ // TODO: need a tokio select, in case the state arrives from somewhere else
+ if let Ok(state) = recv.await {
+ read_state = state;
+ } else {
+ break
+ }
+ },
+ ChildState::Read(read) => {
+ read_state = read;
+ let (send, recv) = oneshot::channel();
+ let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
+ // TODO: need a tokio select, in case the state arrives from somewhere else
+ if let Ok(state) = recv.await {
+ write_state = state;
+ } else {
+ break
+ }
+ },
+ }
+
+ let mut jid = self.connected.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ match connection {
+ Ok(c) => {
+ let (read, write) = c.split();
+ let (send, recv) = oneshot::channel();
+ self.writer_crash = recv;
+ self.write_control_handle =
+ WriteControlHandle::reconnect(write, send, write_state.stanza_recv);
+ let (send, recv) = oneshot::channel();
+ self.reader_crash = recv;
+ self.read_control_handle = ReadControlHandle::reconnect(
+ read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
+ send,
+ );
+ },
+ Err(e) => {
+ // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
+ write_state.stanza_recv.close();
+ while let Some(msg) = write_state.stanza_recv.recv().await {
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ // TODO: is this the correct error?
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
+ break;
+ },
+ }
+ },
+ }
+ },
+ Ok((write_msg, mut write_state)) = &mut self.writer_crash => {
+ // consider awaiting/aborting the read and write threads
+ let (send, recv) = oneshot::channel();
+ let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
+ let read_state = tokio::select! {
+ Ok(s) = recv => s,
+ Ok(s) = &mut self.reader_crash => s,
+ // in case, just break as irrecoverable
+ else => break,
+ };
+
+ let mut jid = self.connected.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ // TODO: same here
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ match connection {
+ Ok(c) => {
+ let (read, write) = c.split();
+ let (send, recv) = oneshot::channel();
+ self.writer_crash = recv;
+ self.write_control_handle =
+ WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, write_msg);
+ let (send, recv) = oneshot::channel();
+ self.reader_crash = recv;
+ self.read_control_handle = ReadControlHandle::reconnect(
+ read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
+ send,
+ );
+ },
+ Err(e) => {
+ // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
+ write_state.stanza_recv.close();
+ let _ = write_msg.respond_to.send(Err(WriteError::LostConnection));
+ while let Some(msg) = write_state.stanza_recv.recv().await {
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ // TODO: is this the correct error to send?
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
+ break;
+ },
+ }
+ },
+ Ok(read_state) = &mut self.reader_crash => {
+ let (send, recv) = oneshot::channel();
+ let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
+ let (retry_msg, mut write_state) = tokio::select! {
+ Ok(s) = recv => (None, s),
+ Ok(s) = &mut self.writer_crash => (Some(s.0), s.1),
+ // in case, just break as irrecoverable
+ else => break,
+ };
+
+ let mut jid = self.connected.jid.clone();
+ let mut domain = jid.domainpart.clone();
+ let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await;
+ match connection {
+ Ok(c) => {
+ let (read, write) = c.split();
+ let (send, recv) = oneshot::channel();
+ self.writer_crash = recv;
+ if let Some(msg) = retry_msg {
+ self.write_control_handle =
+ WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, msg);
+ } else {
+ self.write_control_handle = WriteControlHandle::reconnect(write, send, write_state.stanza_recv)
+ }
+ let (send, recv) = oneshot::channel();
+ self.reader_crash = recv;
+ self.read_control_handle = ReadControlHandle::reconnect(
+ read,
+ read_state.tasks,
+ self.connected.clone(),
+ self.logic.clone(),
+ read_state.supervisor_control,
+ send,
+ );
+ },
+ Err(e) => {
+ // if reconnection failure, respond to all current messages with lost connection error.
+ write_state.stanza_recv.close();
+ if let Some(msg) = retry_msg {
+ msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ while let Some(msg) = write_state.stanza_recv.recv().await {
+ msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ // TODO: is this the correct error?
+ self.logic.handle_connection_error(ConnectionError::LostConnection).await;
+ break;
+ },
+ }
+ },
+ else => break,
+ }
+ }
+ // TODO: maybe don't just on_crash
+ let _ = self.on_crash.send(());
+ }
+}
+
+pub struct SupervisorHandle {
+ sender: SupervisorSender,
+ handle: JoinHandle<()>,
+}
+
+impl Deref for SupervisorHandle {
+ type Target = SupervisorSender;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for SupervisorHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+#[derive(Clone)]
+pub struct SupervisorSender {
+ sender: mpsc::Sender<SupervisorCommand>,
+}
+
+impl Deref for SupervisorSender {
+ type Target = mpsc::Sender<SupervisorCommand>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for SupervisorSender {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl SupervisorHandle {
+ pub fn new<Lgc: Logic + Clone + Send + 'static>(
+ streams: BoundJabberStream<Tls>,
+ on_crash: oneshot::Sender<()>,
+ jid: JID,
+ password: Arc<String>,
+ logic: Lgc,
+ ) -> (WriteHandle, Self) {
+ let (command_send, command_recv) = mpsc::channel(20);
+ let (writer_crash_send, writer_crash_recv) = oneshot::channel();
+ let (reader_crash_send, reader_crash_recv) = oneshot::channel();
+
+ let (read_stream, write_stream) = streams.split();
+
+ let (write_handle, write_control_handle) =
+ WriteControlHandle::new(write_stream, writer_crash_send);
+
+ let connected = Connected {
+ jid,
+ write_handle: write_handle.clone(),
+ };
+
+ let supervisor_sender = SupervisorSender {
+ sender: command_send,
+ };
+
+ let read_control_handle = ReadControlHandle::new(
+ read_stream,
+ connected.clone(),
+ logic.clone(),
+ supervisor_sender.clone(),
+ reader_crash_send,
+ );
+
+ let actor = Supervisor::new(
+ command_recv,
+ reader_crash_recv,
+ writer_crash_recv,
+ read_control_handle,
+ write_control_handle,
+ on_crash,
+ connected,
+ password,
+ logic,
+ );
+
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ (
+ write_handle,
+ Self {
+ sender: supervisor_sender,
+ handle,
+ },
+ )
+ }
+
+ pub fn sender(&self) -> SupervisorSender {
+ self.sender.clone()
+ }
+}
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
new file mode 100644
index 0000000..cc69387
--- /dev/null
+++ b/lampada/src/connection/read.rs
@@ -0,0 +1,233 @@
+use std::{
+ collections::HashMap,
+ marker::PhantomData,
+ ops::{Deref, DerefMut},
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot, Mutex},
+ task::{JoinHandle, JoinSet},
+};
+use tracing::info;
+
+use crate::{Connected, Logic};
+
+use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
+
+/// read actor
+pub struct Read<Lgc> {
+ stream: BoundJabberReader<Tls>,
+ disconnecting: bool,
+ disconnect_timedout: oneshot::Receiver<()>,
+
+ // all the threads spawned by the current connection session
+ tasks: JoinSet<()>,
+
+ // for handling incoming stanzas
+ // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+
+ // control stuff
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
+}
+
+/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
+pub struct ReadState {
+ pub supervisor_control: SupervisorSender,
+ // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
+ pub tasks: JoinSet<()>,
+}
+
+impl<Lgc> Read<Lgc> {
+ fn new(
+ stream: BoundJabberReader<Tls>,
+ tasks: JoinSet<()>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (_send, recv) = oneshot::channel();
+ Self {
+ stream,
+ disconnecting: false,
+ disconnect_timedout: recv,
+ tasks,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ }
+ }
+}
+
+impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
+ async fn run(mut self) {
+ println!("started read thread");
+ // let stanza = self.stream.read::<Stanza>().await;
+ // println!("{:?}", stanza);
+ loop {
+ tokio::select! {
+ // if still haven't received the end tag in time, just kill itself
+ // TODO: is this okay??? what if notification thread dies?
+ Ok(()) = &mut self.disconnect_timedout => {
+ info!("disconnect_timedout");
+ break;
+ }
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ // when disconnect received,
+ ReadControl::Disconnect => {
+ let (send, recv) = oneshot::channel();
+ self.disconnect_timedout = recv;
+ self.disconnecting = true;
+ tokio::spawn(async {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ let _ = send.send(());
+ })
+ },
+ ReadControl::Abort(sender) => {
+ let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ break;
+ },
+ };
+ },
+ s = self.stream.read::<Stanza>() => {
+ println!("read stanza");
+ match s {
+ Ok(s) => {
+ self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
+ },
+ Err(e) => {
+ println!("error: {:?}", e);
+ // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
+ // match e {
+ // peanuts::Error::ReadError(error) => todo!(),
+ // peanuts::Error::Utf8Error(utf8_error) => todo!(),
+ // peanuts::Error::ParseError(_) => todo!(),
+ // peanuts::Error::EntityProcessError(_) => todo!(),
+ // peanuts::Error::InvalidCharRef(_) => todo!(),
+ // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
+ // peanuts::Error::DuplicateAttribute(_) => todo!(),
+ // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
+ // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
+ // peanuts::Error::NotInElement(_) => todo!(),
+ // peanuts::Error::ExtraData(_) => todo!(),
+ // peanuts::Error::UndeclaredNamespace(_) => todo!(),
+ // peanuts::Error::IncorrectName(name) => todo!(),
+ // peanuts::Error::DeserializeError(_) => todo!(),
+ // peanuts::Error::Deserialize(deserialize_error) => todo!(),
+ // peanuts::Error::RootElementEnded => todo!(),
+ // }
+ // TODO: make sure this only happens when an end tag is received
+ if self.disconnecting == true {
+ break;
+ } else {
+ let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ }
+ break;
+ },
+ }
+ },
+ else => break
+ }
+ }
+ println!("stopping read thread");
+ self.logic.on_abort().await;
+ }
+}
+
+// what do stanza processes do?
+// - update ui
+// - access database
+// - disconnect proper, reconnect
+// - respond to server requests
+
+pub enum ReadControl {
+ Disconnect,
+ Abort(oneshot::Sender<ReadState>),
+}
+
+pub struct ReadControlHandle {
+ sender: mpsc::Sender<ReadControl>,
+ pub(crate) handle: JoinHandle<()>,
+}
+
+impl Deref for ReadControlHandle {
+ type Target = mpsc::Sender<ReadControl>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for ReadControlHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl ReadControlHandle {
+ pub fn new<Lgc: Clone + Logic + Send + 'static>(
+ stream: BoundJabberReader<Tls>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ stream,
+ JoinSet::new(),
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+
+ pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
+ stream: BoundJabberReader<Tls>,
+ tasks: JoinSet<()>,
+ connected: Connected,
+ logic: Lgc,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Read::new(
+ stream,
+ tasks,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
+ );
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}
diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs
new file mode 100644
index 0000000..8f0c34b
--- /dev/null
+++ b/lampada/src/connection/write.rs
@@ -0,0 +1,258 @@
+use std::ops::{Deref, DerefMut};
+
+use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
+use stanza::client::Stanza;
+use tokio::{
+ sync::{mpsc, oneshot},
+ task::JoinHandle,
+};
+
+use crate::error::WriteError;
+
+/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
+pub struct Write {
+ stream: BoundJabberWriter<Tls>,
+
+ /// connection session write queue
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+
+ // control stuff
+ control_receiver: mpsc::Receiver<WriteControl>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+}
+
+/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue
+pub struct WriteState {
+ pub stanza_recv: mpsc::Receiver<WriteMessage>,
+}
+
+#[derive(Debug)]
+pub struct WriteMessage {
+ pub stanza: Stanza,
+ pub respond_to: oneshot::Sender<Result<(), WriteError>>,
+}
+
+pub enum WriteControl {
+ Disconnect,
+ Abort(oneshot::Sender<WriteState>),
+}
+
+impl Write {
+ fn new(
+ stream: BoundJabberWriter<Tls>,
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ control_receiver: mpsc::Receiver<WriteControl>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+ ) -> Self {
+ Self {
+ stream,
+ stanza_receiver,
+ control_receiver,
+ on_crash,
+ }
+ }
+
+ async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
+ Ok(self.stream.write(stanza).await?)
+ }
+
+ async fn run_reconnected(mut self, retry_msg: WriteMessage) {
+ // try to retry sending the message that failed to send previously
+ let result = self.stream.write(&retry_msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(_error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ // TODO: upon reconnect, make sure we are not stuck in a reconnection loop
+ let _ = self.on_crash.send((
+ retry_msg,
+ WriteState {
+ stanza_recv: self.stanza_receiver,
+ },
+ ));
+ return;
+ }
+ _ => {
+ let _ = retry_msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ let _ = retry_msg.respond_to.send(Ok(()));
+ }
+ }
+ // return to normal loop
+ self.run().await
+ }
+
+ async fn run(mut self) {
+ loop {
+ tokio::select! {
+ Some(msg) = self.control_receiver.recv() => {
+ match msg {
+ WriteControl::Disconnect => {
+ // close the stanza_receiver channel and drain out all of the remaining stanzas to send
+ self.stanza_receiver.close();
+ // TODO: put this in some kind of function to avoid code duplication
+ while let Some(msg) = self.stanza_receiver.recv().await {
+ let result = self.stream.write(&msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(_error) => {
+ // if connection lost during disconnection, just send lost connection error to the write requests
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
+ while let Some(msg) = self.stanza_receiver.recv().await {
+ let _ = msg.respond_to.send(Err(WriteError::LostConnection));
+ }
+ break;
+ }
+ // otherwise complete sending all the stanzas currently in the queue
+ _ => {
+ let _ = msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ let _ = msg.respond_to.send(Ok(()));
+ }
+ }
+ }
+ let _ = self.stream.try_close().await;
+ break;
+ },
+ // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource
+ WriteControl::Abort(sender) => {
+ let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver });
+ break;
+ },
+ }
+ },
+ Some(msg) = self.stanza_receiver.recv() => {
+ let result = self.stream.write(&msg.stanza).await;
+ match result {
+ Err(e) => match &e {
+ peanuts::Error::ReadError(_error) => {
+ // make sure message is not lost from error, supervisor handles retry and reporting
+ let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver }));
+ break;
+ }
+ _ => {
+ let _ = msg.respond_to.send(Err(e.into()));
+ }
+ },
+ _ => {
+ let _ = msg.respond_to.send(Ok(()));
+ }
+ }
+ },
+ else => break,
+ }
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct WriteHandle {
+ sender: mpsc::Sender<WriteMessage>,
+}
+
+impl WriteHandle {
+ pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> {
+ let (send, recv) = oneshot::channel();
+ self.send(WriteMessage {
+ stanza,
+ respond_to: send,
+ })
+ .await
+ .map_err(|e| WriteError::Actor(e.into()))?;
+ // TODO: timeout
+ recv.await.map_err(|e| WriteError::Actor(e.into()))?
+ }
+}
+
+impl Deref for WriteHandle {
+ type Target = mpsc::Sender<WriteMessage>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for WriteHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+pub struct WriteControlHandle {
+ sender: mpsc::Sender<WriteControl>,
+ pub(crate) handle: JoinHandle<()>,
+}
+
+impl Deref for WriteControlHandle {
+ type Target = mpsc::Sender<WriteControl>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.sender
+ }
+}
+
+impl DerefMut for WriteControlHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.sender
+ }
+}
+
+impl WriteControlHandle {
+ pub fn new(
+ stream: BoundJabberWriter<Tls>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+ ) -> (WriteHandle, Self) {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+ let (stanza_sender, stanza_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ (
+ WriteHandle {
+ sender: stanza_sender,
+ },
+ Self {
+ sender: control_sender,
+ handle,
+ },
+ )
+ }
+
+ pub fn reconnect_retry(
+ stream: BoundJabberWriter<Tls>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ retry_msg: WriteMessage,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
+ let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+
+ pub fn reconnect(
+ stream: BoundJabberWriter<Tls>,
+ on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
+ stanza_receiver: mpsc::Receiver<WriteMessage>,
+ ) -> Self {
+ let (control_sender, control_receiver) = mpsc::channel(20);
+
+ let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
+ let handle = tokio::spawn(async move { actor.run().await });
+
+ Self {
+ sender: control_sender,
+ handle,
+ }
+ }
+}