diff options
Diffstat (limited to 'lampada/src/connection')
-rw-r--r-- | lampada/src/connection/mod.rs | 374 | ||||
-rw-r--r-- | lampada/src/connection/read.rs | 233 | ||||
-rw-r--r-- | lampada/src/connection/write.rs | 258 |
3 files changed, 865 insertions, 0 deletions
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs new file mode 100644 index 0000000..1e767b0 --- /dev/null +++ b/lampada/src/connection/mod.rs @@ -0,0 +1,374 @@ +// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly + +use std::{ + collections::HashMap, + ops::{Deref, DerefMut}, + sync::Arc, + time::Duration, +}; + +use jid::JID; +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; +use read::{ReadControl, ReadControlHandle, ReadState}; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + task::{JoinHandle, JoinSet}, +}; +use tracing::info; +use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; + +use crate::{ + error::{ConnectionError, WriteError}, + Connected, Logic, +}; + +mod read; +pub(crate) mod write; + +pub struct Supervisor<Lgc> { + command_recv: mpsc::Receiver<SupervisorCommand>, + reader_crash: oneshot::Receiver<ReadState>, + writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, + read_control_handle: ReadControlHandle, + write_control_handle: WriteControlHandle, + on_crash: oneshot::Sender<()>, + // jid in connected stays the same over the life of the supervisor (the connection session) + connected: Connected, + password: Arc<String>, + logic: Lgc, +} + +pub enum SupervisorCommand { + Disconnect, + // for if there was a stream error, require to reconnect + // couldn't stream errors just cause a crash? lol + Reconnect(ChildState), +} + +pub enum ChildState { + Write(WriteState), + Read(ReadState), +} + +impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { + fn new( + command_recv: mpsc::Receiver<SupervisorCommand>, + reader_crash: oneshot::Receiver<ReadState>, + writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, + read_control_handle: ReadControlHandle, + write_control_handle: WriteControlHandle, + on_crash: oneshot::Sender<()>, + connected: Connected, + password: Arc<String>, + logic: Lgc, + ) -> Self { + Self { + command_recv, + reader_crash, + writer_crash, + read_control_handle, + write_control_handle, + on_crash, + connected, + password, + logic, + } + } + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.command_recv.recv() => { + match msg { + SupervisorCommand::Disconnect => { + info!("disconnecting"); + self.logic + .handle_disconnect(self.connected.clone()) + .await; + let _ = self.write_control_handle.send(WriteControl::Disconnect).await; + let _ = self.read_control_handle.send(ReadControl::Disconnect).await; + info!("sent disconnect command"); + tokio::select! { + _ = async { tokio::join!( + async { let _ = (&mut self.write_control_handle.handle).await; }, + async { let _ = (&mut self.read_control_handle.handle).await; } + ) } => {}, + // TODO: config timeout + _ = async { tokio::time::sleep(Duration::from_secs(5)) } => { + (&mut self.read_control_handle.handle).abort(); + (&mut self.write_control_handle.handle).abort(); + } + } + info!("disconnected"); + break; + }, + // TODO: Reconnect without aborting, gentle reconnect. + SupervisorCommand::Reconnect(state) => { + // TODO: please omfg + // send abort to read stream, as already done, consider + let (read_state, mut write_state); + match state { + ChildState::Write(receiver) => { + write_state = receiver; + let (send, recv) = oneshot::channel(); + let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else + if let Ok(state) = recv.await { + read_state = state; + } else { + break + } + }, + ChildState::Read(read) => { + read_state = read; + let (send, recv) = oneshot::channel(); + let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else + if let Ok(state) = recv.await { + write_state = state; + } else { + break + } + }, + } + + let mut jid = self.connected.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + match connection { + Ok(c) => { + let (read, write) = c.split(); + let (send, recv) = oneshot::channel(); + self.writer_crash = recv; + self.write_control_handle = + WriteControlHandle::reconnect(write, send, write_state.stanza_recv); + let (send, recv) = oneshot::channel(); + self.reader_crash = recv; + self.read_control_handle = ReadControlHandle::reconnect( + read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, + send, + ); + }, + Err(e) => { + // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. + write_state.stanza_recv.close(); + while let Some(msg) = write_state.stanza_recv.recv().await { + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + } + // TODO: is this the correct error? + self.logic.handle_connection_error(ConnectionError::LostConnection).await; + break; + }, + } + }, + } + }, + Ok((write_msg, mut write_state)) = &mut self.writer_crash => { + // consider awaiting/aborting the read and write threads + let (send, recv) = oneshot::channel(); + let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; + let read_state = tokio::select! { + Ok(s) = recv => s, + Ok(s) = &mut self.reader_crash => s, + // in case, just break as irrecoverable + else => break, + }; + + let mut jid = self.connected.jid.clone(); + let mut domain = jid.domainpart.clone(); + // TODO: same here + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + match connection { + Ok(c) => { + let (read, write) = c.split(); + let (send, recv) = oneshot::channel(); + self.writer_crash = recv; + self.write_control_handle = + WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, write_msg); + let (send, recv) = oneshot::channel(); + self.reader_crash = recv; + self.read_control_handle = ReadControlHandle::reconnect( + read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, + send, + ); + }, + Err(e) => { + // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. + write_state.stanza_recv.close(); + let _ = write_msg.respond_to.send(Err(WriteError::LostConnection)); + while let Some(msg) = write_state.stanza_recv.recv().await { + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + } + // TODO: is this the correct error to send? + self.logic.handle_connection_error(ConnectionError::LostConnection).await; + break; + }, + } + }, + Ok(read_state) = &mut self.reader_crash => { + let (send, recv) = oneshot::channel(); + let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; + let (retry_msg, mut write_state) = tokio::select! { + Ok(s) = recv => (None, s), + Ok(s) = &mut self.writer_crash => (Some(s.0), s.1), + // in case, just break as irrecoverable + else => break, + }; + + let mut jid = self.connected.jid.clone(); + let mut domain = jid.domainpart.clone(); + let connection = luz::connect_and_login(&mut jid, &*self.password, &mut domain).await; + match connection { + Ok(c) => { + let (read, write) = c.split(); + let (send, recv) = oneshot::channel(); + self.writer_crash = recv; + if let Some(msg) = retry_msg { + self.write_control_handle = + WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, msg); + } else { + self.write_control_handle = WriteControlHandle::reconnect(write, send, write_state.stanza_recv) + } + let (send, recv) = oneshot::channel(); + self.reader_crash = recv; + self.read_control_handle = ReadControlHandle::reconnect( + read, + read_state.tasks, + self.connected.clone(), + self.logic.clone(), + read_state.supervisor_control, + send, + ); + }, + Err(e) => { + // if reconnection failure, respond to all current messages with lost connection error. + write_state.stanza_recv.close(); + if let Some(msg) = retry_msg { + msg.respond_to.send(Err(WriteError::LostConnection)); + } + while let Some(msg) = write_state.stanza_recv.recv().await { + msg.respond_to.send(Err(WriteError::LostConnection)); + } + // TODO: is this the correct error? + self.logic.handle_connection_error(ConnectionError::LostConnection).await; + break; + }, + } + }, + else => break, + } + } + // TODO: maybe don't just on_crash + let _ = self.on_crash.send(()); + } +} + +pub struct SupervisorHandle { + sender: SupervisorSender, + handle: JoinHandle<()>, +} + +impl Deref for SupervisorHandle { + type Target = SupervisorSender; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for SupervisorHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +#[derive(Clone)] +pub struct SupervisorSender { + sender: mpsc::Sender<SupervisorCommand>, +} + +impl Deref for SupervisorSender { + type Target = mpsc::Sender<SupervisorCommand>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for SupervisorSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl SupervisorHandle { + pub fn new<Lgc: Logic + Clone + Send + 'static>( + streams: BoundJabberStream<Tls>, + on_crash: oneshot::Sender<()>, + jid: JID, + password: Arc<String>, + logic: Lgc, + ) -> (WriteHandle, Self) { + let (command_send, command_recv) = mpsc::channel(20); + let (writer_crash_send, writer_crash_recv) = oneshot::channel(); + let (reader_crash_send, reader_crash_recv) = oneshot::channel(); + + let (read_stream, write_stream) = streams.split(); + + let (write_handle, write_control_handle) = + WriteControlHandle::new(write_stream, writer_crash_send); + + let connected = Connected { + jid, + write_handle: write_handle.clone(), + }; + + let supervisor_sender = SupervisorSender { + sender: command_send, + }; + + let read_control_handle = ReadControlHandle::new( + read_stream, + connected.clone(), + logic.clone(), + supervisor_sender.clone(), + reader_crash_send, + ); + + let actor = Supervisor::new( + command_recv, + reader_crash_recv, + writer_crash_recv, + read_control_handle, + write_control_handle, + on_crash, + connected, + password, + logic, + ); + + let handle = tokio::spawn(async move { actor.run().await }); + + ( + write_handle, + Self { + sender: supervisor_sender, + handle, + }, + ) + } + + pub fn sender(&self) -> SupervisorSender { + self.sender.clone() + } +} diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs new file mode 100644 index 0000000..cc69387 --- /dev/null +++ b/lampada/src/connection/read.rs @@ -0,0 +1,233 @@ +use std::{ + collections::HashMap, + marker::PhantomData, + ops::{Deref, DerefMut}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + task::{JoinHandle, JoinSet}, +}; +use tracing::info; + +use crate::{Connected, Logic}; + +use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; + +/// read actor +pub struct Read<Lgc> { + stream: BoundJabberReader<Tls>, + disconnecting: bool, + disconnect_timedout: oneshot::Receiver<()>, + + // all the threads spawned by the current connection session + tasks: JoinSet<()>, + + // for handling incoming stanzas + // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) + connected: Connected, + logic: Lgc, + supervisor_control: SupervisorSender, + + // control stuff + control_receiver: mpsc::Receiver<ReadControl>, + on_crash: oneshot::Sender<ReadState>, +} + +/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue +pub struct ReadState { + pub supervisor_control: SupervisorSender, + // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream + pub tasks: JoinSet<()>, +} + +impl<Lgc> Read<Lgc> { + fn new( + stream: BoundJabberReader<Tls>, + tasks: JoinSet<()>, + connected: Connected, + logic: Lgc, + supervisor_control: SupervisorSender, + control_receiver: mpsc::Receiver<ReadControl>, + on_crash: oneshot::Sender<ReadState>, + ) -> Self { + let (_send, recv) = oneshot::channel(); + Self { + stream, + disconnecting: false, + disconnect_timedout: recv, + tasks, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, + } + } +} + +impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { + async fn run(mut self) { + println!("started read thread"); + // let stanza = self.stream.read::<Stanza>().await; + // println!("{:?}", stanza); + loop { + tokio::select! { + // if still haven't received the end tag in time, just kill itself + // TODO: is this okay??? what if notification thread dies? + Ok(()) = &mut self.disconnect_timedout => { + info!("disconnect_timedout"); + break; + } + Some(msg) = self.control_receiver.recv() => { + match msg { + // when disconnect received, + ReadControl::Disconnect => { + let (send, recv) = oneshot::channel(); + self.disconnect_timedout = recv; + self.disconnecting = true; + tokio::spawn(async { + tokio::time::sleep(Duration::from_secs(10)).await; + let _ = send.send(()); + }) + }, + ReadControl::Abort(sender) => { + let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + break; + }, + }; + }, + s = self.stream.read::<Stanza>() => { + println!("read stanza"); + match s { + Ok(s) => { + self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); + }, + Err(e) => { + println!("error: {:?}", e); + // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true + // match e { + // peanuts::Error::ReadError(error) => todo!(), + // peanuts::Error::Utf8Error(utf8_error) => todo!(), + // peanuts::Error::ParseError(_) => todo!(), + // peanuts::Error::EntityProcessError(_) => todo!(), + // peanuts::Error::InvalidCharRef(_) => todo!(), + // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), + // peanuts::Error::DuplicateAttribute(_) => todo!(), + // peanuts::Error::UnqualifiedNamespace(_) => todo!(), + // peanuts::Error::MismatchedEndTag(name, name1) => todo!(), + // peanuts::Error::NotInElement(_) => todo!(), + // peanuts::Error::ExtraData(_) => todo!(), + // peanuts::Error::UndeclaredNamespace(_) => todo!(), + // peanuts::Error::IncorrectName(name) => todo!(), + // peanuts::Error::DeserializeError(_) => todo!(), + // peanuts::Error::Deserialize(deserialize_error) => todo!(), + // peanuts::Error::RootElementEnded => todo!(), + // } + // TODO: make sure this only happens when an end tag is received + if self.disconnecting == true { + break; + } else { + let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + } + break; + }, + } + }, + else => break + } + } + println!("stopping read thread"); + self.logic.on_abort().await; + } +} + +// what do stanza processes do? +// - update ui +// - access database +// - disconnect proper, reconnect +// - respond to server requests + +pub enum ReadControl { + Disconnect, + Abort(oneshot::Sender<ReadState>), +} + +pub struct ReadControlHandle { + sender: mpsc::Sender<ReadControl>, + pub(crate) handle: JoinHandle<()>, +} + +impl Deref for ReadControlHandle { + type Target = mpsc::Sender<ReadControl>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for ReadControlHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl ReadControlHandle { + pub fn new<Lgc: Clone + Logic + Send + 'static>( + stream: BoundJabberReader<Tls>, + connected: Connected, + logic: Lgc, + supervisor_control: SupervisorSender, + on_crash: oneshot::Sender<ReadState>, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Read::new( + stream, + JoinSet::new(), + connected, + logic, + supervisor_control, + control_receiver, + on_crash, + ); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } + + pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( + stream: BoundJabberReader<Tls>, + tasks: JoinSet<()>, + connected: Connected, + logic: Lgc, + supervisor_control: SupervisorSender, + on_crash: oneshot::Sender<ReadState>, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Read::new( + stream, + tasks, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, + ); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } +} diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs new file mode 100644 index 0000000..8f0c34b --- /dev/null +++ b/lampada/src/connection/write.rs @@ -0,0 +1,258 @@ +use std::ops::{Deref, DerefMut}; + +use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use stanza::client::Stanza; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; + +use crate::error::WriteError; + +/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. +pub struct Write { + stream: BoundJabberWriter<Tls>, + + /// connection session write queue + stanza_receiver: mpsc::Receiver<WriteMessage>, + + // control stuff + control_receiver: mpsc::Receiver<WriteControl>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, +} + +/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue +pub struct WriteState { + pub stanza_recv: mpsc::Receiver<WriteMessage>, +} + +#[derive(Debug)] +pub struct WriteMessage { + pub stanza: Stanza, + pub respond_to: oneshot::Sender<Result<(), WriteError>>, +} + +pub enum WriteControl { + Disconnect, + Abort(oneshot::Sender<WriteState>), +} + +impl Write { + fn new( + stream: BoundJabberWriter<Tls>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + control_receiver: mpsc::Receiver<WriteControl>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + ) -> Self { + Self { + stream, + stanza_receiver, + control_receiver, + on_crash, + } + } + + async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> { + Ok(self.stream.write(stanza).await?) + } + + async fn run_reconnected(mut self, retry_msg: WriteMessage) { + // try to retry sending the message that failed to send previously + let result = self.stream.write(&retry_msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // make sure message is not lost from error, supervisor handles retry and reporting + // TODO: upon reconnect, make sure we are not stuck in a reconnection loop + let _ = self.on_crash.send(( + retry_msg, + WriteState { + stanza_recv: self.stanza_receiver, + }, + )); + return; + } + _ => { + let _ = retry_msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = retry_msg.respond_to.send(Ok(())); + } + } + // return to normal loop + self.run().await + } + + async fn run(mut self) { + loop { + tokio::select! { + Some(msg) = self.control_receiver.recv() => { + match msg { + WriteControl::Disconnect => { + // close the stanza_receiver channel and drain out all of the remaining stanzas to send + self.stanza_receiver.close(); + // TODO: put this in some kind of function to avoid code duplication + while let Some(msg) = self.stanza_receiver.recv().await { + let result = self.stream.write(&msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // if connection lost during disconnection, just send lost connection error to the write requests + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + while let Some(msg) = self.stanza_receiver.recv().await { + let _ = msg.respond_to.send(Err(WriteError::LostConnection)); + } + break; + } + // otherwise complete sending all the stanzas currently in the queue + _ => { + let _ = msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = msg.respond_to.send(Ok(())); + } + } + } + let _ = self.stream.try_close().await; + break; + }, + // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource + WriteControl::Abort(sender) => { + let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver }); + break; + }, + } + }, + Some(msg) = self.stanza_receiver.recv() => { + let result = self.stream.write(&msg.stanza).await; + match result { + Err(e) => match &e { + peanuts::Error::ReadError(_error) => { + // make sure message is not lost from error, supervisor handles retry and reporting + let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver })); + break; + } + _ => { + let _ = msg.respond_to.send(Err(e.into())); + } + }, + _ => { + let _ = msg.respond_to.send(Ok(())); + } + } + }, + else => break, + } + } + } +} + +#[derive(Clone)] +pub struct WriteHandle { + sender: mpsc::Sender<WriteMessage>, +} + +impl WriteHandle { + pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> { + let (send, recv) = oneshot::channel(); + self.send(WriteMessage { + stanza, + respond_to: send, + }) + .await + .map_err(|e| WriteError::Actor(e.into()))?; + // TODO: timeout + recv.await.map_err(|e| WriteError::Actor(e.into()))? + } +} + +impl Deref for WriteHandle { + type Target = mpsc::Sender<WriteMessage>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for WriteHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +pub struct WriteControlHandle { + sender: mpsc::Sender<WriteControl>, + pub(crate) handle: JoinHandle<()>, +} + +impl Deref for WriteControlHandle { + type Target = mpsc::Sender<WriteControl>; + + fn deref(&self) -> &Self::Target { + &self.sender + } +} + +impl DerefMut for WriteControlHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sender + } +} + +impl WriteControlHandle { + pub fn new( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + ) -> (WriteHandle, Self) { + let (control_sender, control_receiver) = mpsc::channel(20); + let (stanza_sender, stanza_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run().await }); + + ( + WriteHandle { + sender: stanza_sender, + }, + Self { + sender: control_sender, + handle, + }, + ) + } + + pub fn reconnect_retry( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + retry_msg: WriteMessage, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); + + Self { + sender: control_sender, + handle, + } + } + + pub fn reconnect( + stream: BoundJabberWriter<Tls>, + on_crash: oneshot::Sender<(WriteMessage, WriteState)>, + stanza_receiver: mpsc::Receiver<WriteMessage>, + ) -> Self { + let (control_sender, control_receiver) = mpsc::channel(20); + + let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); + let handle = tokio::spawn(async move { actor.run().await }); + + Self { + sender: control_sender, + handle, + } + } +} |