diff options
Diffstat (limited to 'lampada/src/connection/write.rs')
-rw-r--r-- | lampada/src/connection/write.rs | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs new file mode 100644 index 0000000..8f0c34b --- /dev/null +++ b/lampada/src/connection/write.rs @@ -0,0 +1,258 @@ +use std::ops::{Deref, DerefMut}; + +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; + +use crate::error::WriteError; + +/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. +pub struct Write { + stream: BoundJabberWriter<Tls>, + + /// connection session write queue + stanza_receiver: mpsc::Receiver<WriteMessage>, + + // control stuff + control_receiver: mpsc::Receiver<WriteControl>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, +} + +/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue +pub struct WriteState { + pub stanza_recv: mpsc::Receiver<WriteMessage>, +} + +#[derive(Debug)] +pub struct WriteMessage { + pub stanza: Stanza, + pub respond_to: oneshot::Sender<Result<(), WriteError>>, +} + +pub enum WriteControl { + Disconnect, + Abort(oneshot::Sender<WriteState>), +} + +impl Write { + fn new( + stream: BoundJabberWriter<Tls>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + control_receiver: mpsc::Receiver<WriteControl>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + ) -> Self { + Self { + stream, + stanza_receiver, + control_receiver, + on_crash, + } + } + + async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> { + Ok(self.stream.write(stanza).await?) + } + + async fn run_reconnected(mut self, retry_msg: WriteMessage) { + // try to retry sending the message that failed to send previously + let result = self.stream.write(&retry_msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // make sure message is not lost from error, supervisor handles retry and reporting + // TODO: upon reconnect, make sure we are not stuck in a reconnection loop + let _ = self.on_crash.send(( + retry_msg, + WriteState { + stanza_recv: self.stanza_receiver, + }, + )); + return; + } + _ => { + let _ = retry_msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = retry_msg.respond_to.send(Ok(())); + } + } + // return to normal loop + self.run().await + } + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.control_receiver.recv() => { + match msg { + WriteControl::Disconnect => { + // close the stanza_receiver channel and drain out all of the remaining stanzas to send + self.stanza_receiver.close(); + // TODO: put this in some kind of function to avoid code duplication + while let Some(msg) = self.stanza_receiver.recv().await { + let result = self.stream.write(&msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // if connection lost during disconnection, just send lost connection error to the write requests + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + while let Some(msg) = self.stanza_receiver.recv().await { + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + } + break; + } + // otherwise complete sending all the stanzas currently in the queue + _ => { + let _ = msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = msg.respond_to.send(Ok(())); + } + } + } + let _ = self.stream.try_close().await; + break; + }, + // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource + WriteControl::Abort(sender) => { + let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver }); + break; + }, + } + }, + Some(msg) = self.stanza_receiver.recv() => { + let result = self.stream.write(&msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // make sure message is not lost from error, supervisor handles retry and reporting + let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver })); + break; + } + _ => { + let _ = msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = msg.respond_to.send(Ok(())); + } + } + }, + else => break, + } + } + } +} + +#[derive(Clone)] +pub struct WriteHandle { + sender: mpsc::Sender<WriteMessage>, +} + +impl WriteHandle { + pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> { + let (send, recv) = oneshot::channel(); + self.send(WriteMessage { + stanza, + respond_to: send, + }) + .await + .map_err(|e| WriteError::Actor(e.into()))?; + // TODO: timeout + recv.await.map_err(|e| WriteError::Actor(e.into()))? + } +} + +impl Deref for WriteHandle { + type Target = mpsc::Sender<WriteMessage>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for WriteHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +pub struct WriteControlHandle { + sender: mpsc::Sender<WriteControl>, + pub(crate) handle: JoinHandle<()>, +} + +impl Deref for WriteControlHandle { + type Target = mpsc::Sender<WriteControl>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for WriteControlHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl WriteControlHandle { + pub fn new( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + ) -> (WriteHandle, Self) { + let (control_sender, control_receiver) = mpsc::channel(20); + let (stanza_sender, stanza_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run().await }); + + ( + WriteHandle { + sender: stanza_sender, + }, + Self { + sender: control_sender, + handle, + }, + ) + } + + pub fn reconnect_retry( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + retry_msg: WriteMessage, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); + + Self { + sender: control_sender, + handle, + } + } + + pub fn reconnect( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } +} |