diff options
author | 2025-03-26 14:29:40 +0000 | |
---|---|---|
committer | 2025-03-26 14:29:40 +0000 | |
commit | 2211f324782cdc617b4b5ecd071178e372539fe4 (patch) | |
tree | a5ea5ce11d748424447dee23173d3cb8aec648ea /luz/src/connection | |
parent | 2f8671978e18c1e1e7834056ae674f32fbde3868 (diff) | |
download | luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.gz luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.bz2 luz-2211f324782cdc617b4b5ecd071178e372539fe4.zip |
refactor: rename crates and move client logic to separate crate `filament`
Diffstat (limited to 'luz/src/connection')
-rw-r--r-- | luz/src/connection/mod.rs | 373 | ||||
-rw-r--r-- | luz/src/connection/read.rs | 242 | ||||
-rw-r--r-- | luz/src/connection/write.rs | 258 |
3 files changed, 0 insertions, 873 deletions
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs deleted file mode 100644 index 288de70..0000000 --- a/luz/src/connection/mod.rs +++ /dev/null @@ -1,373 +0,0 @@ -// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly - -use std::{ - collections::HashMap, - ops::{Deref, DerefMut}, - sync::Arc, - time::Duration, -}; - -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; -use jid::JID; -use read::{ReadControl, ReadControlHandle, ReadState}; -use stanza::client::Stanza; -use tokio::{ - sync::{mpsc, oneshot, Mutex}, - task::{JoinHandle, JoinSet}, -}; -use tracing::info; -use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState}; - -use crate::{ - db::Db, - error::{ConnectionError, Error, ReadError, WriteError}, - Connected, Logic, LogicState, UpdateMessage, -}; - -mod read; -pub(crate) mod write; - -pub struct Supervisor<Lgc> { - command_recv: mpsc::Receiver<SupervisorCommand>, - reader_crash: oneshot::Receiver<ReadState>, - writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, - read_control_handle: ReadControlHandle, - write_control_handle: WriteControlHandle, - on_crash: oneshot::Sender<()>, - // jid in connected stays the same over the life of the supervisor (the connection session) - connected: Connected, - password: Arc<String>, - logic: Lgc, -} - -pub enum SupervisorCommand { - Disconnect, - // for if there was a stream error, require to reconnect - // couldn't stream errors just cause a crash? lol - Reconnect(ChildState), -} - -pub enum ChildState { - Write(WriteState), - Read(ReadState), -} - -impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { - fn new( - command_recv: mpsc::Receiver<SupervisorCommand>, - reader_crash: oneshot::Receiver<ReadState>, - writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, - read_control_handle: ReadControlHandle, - write_control_handle: WriteControlHandle, - on_crash: oneshot::Sender<()>, - connected: Connected, - password: Arc<String>, - logic: Lgc, - ) -> Self { - Self { - command_recv, - reader_crash, - writer_crash, - read_control_handle, - write_control_handle, - on_crash, - connected, - password, - logic, - } - } - - async fn run(mut self) { - loop { - tokio::select! { - Some(msg) = self.command_recv.recv() => { - match msg { - SupervisorCommand::Disconnect => { - info!("disconnecting"); - // TODO: do handle_disconnect here - let _ = self.write_control_handle.send(WriteControl::Disconnect).await; - let _ = self.read_control_handle.send(ReadControl::Disconnect).await; - info!("sent disconnect command"); - tokio::select! { - _ = async { tokio::join!( - async { let _ = (&mut self.write_control_handle.handle).await; }, - async { let _ = (&mut self.read_control_handle.handle).await; } - ) } => {}, - // TODO: config timeout - _ = async { tokio::time::sleep(Duration::from_secs(5)) } => { - (&mut self.read_control_handle.handle).abort(); - (&mut self.write_control_handle.handle).abort(); - } - } - info!("disconnected"); - break; - }, - // TODO: Reconnect without aborting, gentle reconnect. - SupervisorCommand::Reconnect(state) => { - // TODO: please omfg - // send abort to read stream, as already done, consider - let (read_state, mut write_state); - match state { - // TODO: proper state things for read and write thread - ChildState::Write(receiver) => { - write_state = receiver; - let (send, recv) = oneshot::channel(); - let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; - if let Ok(state) = recv.await { - read_state = state; - } else { - break - } - }, - ChildState::Read(read) => { - read_state = read; - let (send, recv) = oneshot::channel(); - let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; - // TODO: need a tokio select, in case the state arrives from somewhere else - if let Ok(state) = recv.await { - write_state = state; - } else { - break - } - }, - } - - let mut jid = self.connected.jid.clone(); - let mut domain = jid.domainpart.clone(); - // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; - match connection { - Ok(c) => { - let (read, write) = c.split(); - let (send, recv) = oneshot::channel(); - self.writer_crash = recv; - self.write_control_handle = - WriteControlHandle::reconnect(write, send, write_state.stanza_recv); - let (send, recv) = oneshot::channel(); - self.reader_crash = recv; - self.read_control_handle = ReadControlHandle::reconnect( - read, - read_state.tasks, - self.connected.clone(), - self.logic.clone(), - read_state.supervisor_control, - send, - ); - }, - Err(e) => { - // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. - write_state.stanza_recv.close(); - while let Some(msg) = write_state.stanza_recv.recv().await { - let _ = msg.respond_to.send(Err(WriteError::LostConnection)); - } - // TODO: is this the correct error? - self.logic.handle_connection_error(ConnectionError::LostConnection).await; - break; - }, - } - }, - } - }, - Ok((write_msg, mut write_state)) = &mut self.writer_crash => { - // consider awaiting/aborting the read and write threads - let (send, recv) = oneshot::channel(); - let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; - let read_state = tokio::select! { - Ok(s) = recv => s, - Ok(s) = &mut self.reader_crash => s, - // in case, just break as irrecoverable - else => break, - }; - - let mut jid = self.connected.jid.clone(); - let mut domain = jid.domainpart.clone(); - // TODO: same here - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; - match connection { - Ok(c) => { - let (read, write) = c.split(); - let (send, recv) = oneshot::channel(); - self.writer_crash = recv; - self.write_control_handle = - WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, write_msg); - let (send, recv) = oneshot::channel(); - self.reader_crash = recv; - self.read_control_handle = ReadControlHandle::reconnect( - read, - read_state.tasks, - self.connected.clone(), - self.logic.clone(), - read_state.supervisor_control, - send, - ); - }, - Err(e) => { - // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. - write_state.stanza_recv.close(); - let _ = write_msg.respond_to.send(Err(WriteError::LostConnection)); - while let Some(msg) = write_state.stanza_recv.recv().await { - let _ = msg.respond_to.send(Err(WriteError::LostConnection)); - } - // TODO: is this the correct error to send? - self.logic.handle_connection_error(ConnectionError::LostConnection).await; - break; - }, - } - }, - Ok(read_state) = &mut self.reader_crash => { - let (send, recv) = oneshot::channel(); - let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; - let (retry_msg, mut write_state) = tokio::select! { - Ok(s) = recv => (None, s), - Ok(s) = &mut self.writer_crash => (Some(s.0), s.1), - // in case, just break as irrecoverable - else => break, - }; - - let mut jid = self.connected.jid.clone(); - let mut domain = jid.domainpart.clone(); - let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; - match connection { - Ok(c) => { - let (read, write) = c.split(); - let (send, recv) = oneshot::channel(); - self.writer_crash = recv; - if let Some(msg) = retry_msg { - self.write_control_handle = - WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, msg); - } else { - self.write_control_handle = WriteControlHandle::reconnect(write, send, write_state.stanza_recv) - } - let (send, recv) = oneshot::channel(); - self.reader_crash = recv; - self.read_control_handle = ReadControlHandle::reconnect( - read, - read_state.tasks, - self.connected.clone(), - self.logic.clone(), - read_state.supervisor_control, - send, - ); - }, - Err(e) => { - // if reconnection failure, respond to all current messages with lost connection error. - write_state.stanza_recv.close(); - if let Some(msg) = retry_msg { - msg.respond_to.send(Err(WriteError::LostConnection)); - } - while let Some(msg) = write_state.stanza_recv.recv().await { - msg.respond_to.send(Err(WriteError::LostConnection)); - } - // TODO: is this the correct error? - self.logic.handle_connection_error(ConnectionError::LostConnection).await; - break; - }, - } - }, - else => break, - } - } - // TODO: maybe don't just on_crash - let _ = self.on_crash.send(()); - } -} - -pub struct SupervisorHandle { - sender: SupervisorSender, - handle: JoinHandle<()>, -} - -impl Deref for SupervisorHandle { - type Target = SupervisorSender; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for SupervisorHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -#[derive(Clone)] -pub struct SupervisorSender { - sender: mpsc::Sender<SupervisorCommand>, -} - -impl Deref for SupervisorSender { - type Target = mpsc::Sender<SupervisorCommand>; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for SupervisorSender { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -impl SupervisorHandle { - pub fn new<Lgc: Logic + Clone + Send + 'static>( - streams: BoundJabberStream<Tls>, - on_crash: oneshot::Sender<()>, - jid: JID, - password: Arc<String>, - logic: Lgc, - ) -> (WriteHandle, Self) { - let (command_send, command_recv) = mpsc::channel(20); - let (writer_crash_send, writer_crash_recv) = oneshot::channel(); - let (reader_crash_send, reader_crash_recv) = oneshot::channel(); - - let (read_stream, write_stream) = streams.split(); - - let (write_handle, write_control_handle) = - WriteControlHandle::new(write_stream, writer_crash_send); - - let connected = Connected { - jid, - write_handle: write_handle.clone(), - }; - - let supervisor_sender = SupervisorSender { - sender: command_send, - }; - - let read_control_handle = ReadControlHandle::new( - read_stream, - connected.clone(), - logic.clone(), - supervisor_sender.clone(), - reader_crash_send, - ); - - let actor = Supervisor::new( - command_recv, - reader_crash_recv, - writer_crash_recv, - read_control_handle, - write_control_handle, - on_crash, - connected, - password, - logic, - ); - - let handle = tokio::spawn(async move { actor.run().await }); - - ( - write_handle, - Self { - sender: supervisor_sender, - handle, - }, - ) - } - - pub fn sender(&self) -> SupervisorSender { - self.sender.clone() - } -} diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs deleted file mode 100644 index 4e55bc5..0000000 --- a/luz/src/connection/read.rs +++ /dev/null @@ -1,242 +0,0 @@ -use std::{ - collections::HashMap, - marker::PhantomData, - ops::{Deref, DerefMut}, - str::FromStr, - sync::Arc, - time::Duration, -}; - -use chrono::{DateTime, Utc}; -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; -use stanza::client::Stanza; -use tokio::{ - sync::{mpsc, oneshot, Mutex}, - task::{JoinHandle, JoinSet}, -}; -use tracing::info; -use uuid::Uuid; - -use crate::{ - chat::{Body, Message}, - db::Db, - error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, - presence::{Offline, Online, Presence, PresenceType, Show}, - roster::Contact, - Connected, Logic, LogicState, UpdateMessage, -}; - -use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; - -/// read actor -pub struct Read<Lgc> { - stream: BoundJabberReader<Tls>, - disconnecting: bool, - disconnect_timedout: oneshot::Receiver<()>, - - // all the threads spawned by the current connection session - tasks: JoinSet<()>, - - // for handling incoming stanzas - // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) - connected: Connected, - logic: Lgc, - supervisor_control: SupervisorSender, - - // control stuff - control_receiver: mpsc::Receiver<ReadControl>, - on_crash: oneshot::Sender<ReadState>, -} - -/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue -pub struct ReadState { - pub supervisor_control: SupervisorSender, - // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream - pub tasks: JoinSet<()>, -} - -impl<Lgc> Read<Lgc> { - fn new( - stream: BoundJabberReader<Tls>, - tasks: JoinSet<()>, - connected: Connected, - logic: Lgc, - supervisor_control: SupervisorSender, - control_receiver: mpsc::Receiver<ReadControl>, - on_crash: oneshot::Sender<ReadState>, - ) -> Self { - let (_send, recv) = oneshot::channel(); - Self { - stream, - disconnecting: false, - disconnect_timedout: recv, - tasks, - connected, - logic, - supervisor_control, - control_receiver, - on_crash, - } - } -} - -impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { - async fn run(mut self) { - println!("started read thread"); - // let stanza = self.stream.read::<Stanza>().await; - // println!("{:?}", stanza); - loop { - tokio::select! { - // if still haven't received the end tag in time, just kill itself - // TODO: is this okay??? what if notification thread dies? - Ok(()) = &mut self.disconnect_timedout => { - info!("disconnect_timedout"); - break; - } - Some(msg) = self.control_receiver.recv() => { - match msg { - // when disconnect received, - ReadControl::Disconnect => { - let (send, recv) = oneshot::channel(); - self.disconnect_timedout = recv; - self.disconnecting = true; - tokio::spawn(async { - tokio::time::sleep(Duration::from_secs(10)).await; - let _ = send.send(()); - }) - }, - ReadControl::Abort(sender) => { - let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); - break; - }, - }; - }, - s = self.stream.read::<Stanza>() => { - println!("read stanza"); - match s { - Ok(s) => { - self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); - }, - Err(e) => { - println!("error: {:?}", e); - // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true - // match e { - // peanuts::Error::ReadError(error) => todo!(), - // peanuts::Error::Utf8Error(utf8_error) => todo!(), - // peanuts::Error::ParseError(_) => todo!(), - // peanuts::Error::EntityProcessError(_) => todo!(), - // peanuts::Error::InvalidCharRef(_) => todo!(), - // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), - // peanuts::Error::DuplicateAttribute(_) => todo!(), - // peanuts::Error::UnqualifiedNamespace(_) => todo!(), - // peanuts::Error::MismatchedEndTag(name, name1) => todo!(), - // peanuts::Error::NotInElement(_) => todo!(), - // peanuts::Error::ExtraData(_) => todo!(), - // peanuts::Error::UndeclaredNamespace(_) => todo!(), - // peanuts::Error::IncorrectName(name) => todo!(), - // peanuts::Error::DeserializeError(_) => todo!(), - // peanuts::Error::Deserialize(deserialize_error) => todo!(), - // peanuts::Error::RootElementEnded => todo!(), - // } - // TODO: make sure this only happens when an end tag is received - if self.disconnecting == true { - break; - } else { - let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); - } - break; - }, - } - }, - else => break - } - } - println!("stopping read thread"); - self.logic.on_abort().await; - } -} - -// what do stanza processes do? -// - update ui -// - access database -// - disconnect proper, reconnect -// - respond to server requests - -pub enum ReadControl { - Disconnect, - Abort(oneshot::Sender<ReadState>), -} - -pub struct ReadControlHandle { - sender: mpsc::Sender<ReadControl>, - pub(crate) handle: JoinHandle<()>, -} - -impl Deref for ReadControlHandle { - type Target = mpsc::Sender<ReadControl>; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for ReadControlHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -impl ReadControlHandle { - pub fn new<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, - connected: Connected, - logic: Lgc, - supervisor_control: SupervisorSender, - on_crash: oneshot::Sender<ReadState>, - ) -> Self { - let (control_sender, control_receiver) = mpsc::channel(20); - - let actor = Read::new( - stream, - JoinSet::new(), - connected, - logic, - supervisor_control, - control_receiver, - on_crash, - ); - let handle = tokio::spawn(async move { actor.run().await }); - - Self { - sender: control_sender, - handle, - } - } - - pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( - stream: BoundJabberReader<Tls>, - tasks: JoinSet<()>, - connected: Connected, - logic: Lgc, - supervisor_control: SupervisorSender, - on_crash: oneshot::Sender<ReadState>, - ) -> Self { - let (control_sender, control_receiver) = mpsc::channel(20); - - let actor = Read::new( - stream, - tasks, - connected, - logic, - supervisor_control, - control_receiver, - on_crash, - ); - let handle = tokio::spawn(async move { actor.run().await }); - - Self { - sender: control_sender, - handle, - } - } -} diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs deleted file mode 100644 index ff78b81..0000000 --- a/luz/src/connection/write.rs +++ /dev/null @@ -1,258 +0,0 @@ -use std::ops::{Deref, DerefMut}; - -use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; -use stanza::client::Stanza; -use tokio::{ - sync::{mpsc, oneshot}, - task::JoinHandle, -}; - -use crate::error::WriteError; - -/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. -pub struct Write { - stream: BoundJabberWriter<Tls>, - - /// connection session write queue - stanza_receiver: mpsc::Receiver<WriteMessage>, - - // control stuff - control_receiver: mpsc::Receiver<WriteControl>, - on_crash: oneshot::Sender<(WriteMessage, WriteState)>, -} - -/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue -pub struct WriteState { - pub stanza_recv: mpsc::Receiver<WriteMessage>, -} - -#[derive(Debug)] -pub struct WriteMessage { - pub stanza: Stanza, - pub respond_to: oneshot::Sender<Result<(), WriteError>>, -} - -pub enum WriteControl { - Disconnect, - Abort(oneshot::Sender<WriteState>), -} - -impl Write { - fn new( - stream: BoundJabberWriter<Tls>, - stanza_receiver: mpsc::Receiver<WriteMessage>, - control_receiver: mpsc::Receiver<WriteControl>, - on_crash: oneshot::Sender<(WriteMessage, WriteState)>, - ) -> Self { - Self { - stream, - stanza_receiver, - control_receiver, - on_crash, - } - } - - async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> { - Ok(self.stream.write(stanza).await?) - } - - async fn run_reconnected(mut self, retry_msg: WriteMessage) { - // try to retry sending the message that failed to send previously - let result = self.stream.write(&retry_msg.stanza).await; - match result { - Err(e) => match &e { - peanuts::Error::ReadError(_error) => { - // make sure message is not lost from error, supervisor handles retry and reporting - // TODO: upon reconnect, make sure we are not stuck in a reconnection loop - let _ = self.on_crash.send(( - retry_msg, - WriteState { - stanza_recv: self.stanza_receiver, - }, - )); - return; - } - _ => { - let _ = retry_msg.respond_to.send(Err(e.into())); - } - }, - _ => { - let _ = retry_msg.respond_to.send(Ok(())); - } - } - // return to normal loop - self.run().await - } - - async fn run(mut self) { - loop { - tokio::select! { - Some(msg) = self.control_receiver.recv() => { - match msg { - WriteControl::Disconnect => { - // close the stanza_receiver channel and drain out all of the remaining stanzas to send - self.stanza_receiver.close(); - // TODO: put this in some kind of function to avoid code duplication - while let Some(msg) = self.stanza_receiver.recv().await { - let result = self.stream.write(&msg.stanza).await; - match result { - Err(e) => match &e { - peanuts::Error::ReadError(_error) => { - // if connection lost during disconnection, just send lost connection error to the write requests - let _ = msg.respond_to.send(Err(WriteError::LostConnection)); - while let Some(msg) = self.stanza_receiver.recv().await { - let _ = msg.respond_to.send(Err(WriteError::LostConnection)); - } - break; - } - // otherwise complete sending all the stanzas currently in the queue - _ => { - let _ = msg.respond_to.send(Err(e.into())); - } - }, - _ => { - let _ = msg.respond_to.send(Ok(())); - } - } - } - let _ = self.stream.try_close().await; - break; - }, - // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource - WriteControl::Abort(sender) => { - let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver }); - break; - }, - } - }, - Some(msg) = self.stanza_receiver.recv() => { - let result = self.stream.write(&msg.stanza).await; - match result { - Err(e) => match &e { - peanuts::Error::ReadError(_error) => { - // make sure message is not lost from error, supervisor handles retry and reporting - let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver })); - break; - } - _ => { - let _ = msg.respond_to.send(Err(e.into())); - } - }, - _ => { - let _ = msg.respond_to.send(Ok(())); - } - } - }, - else => break, - } - } - } -} - -#[derive(Clone)] -pub struct WriteHandle { - sender: mpsc::Sender<WriteMessage>, -} - -impl WriteHandle { - pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> { - let (send, recv) = oneshot::channel(); - self.send(WriteMessage { - stanza, - respond_to: send, - }) - .await - .map_err(|e| WriteError::Actor(e.into()))?; - // TODO: timeout - recv.await.map_err(|e| WriteError::Actor(e.into()))? - } -} - -impl Deref for WriteHandle { - type Target = mpsc::Sender<WriteMessage>; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for WriteHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -pub struct WriteControlHandle { - sender: mpsc::Sender<WriteControl>, - pub(crate) handle: JoinHandle<()>, -} - -impl Deref for WriteControlHandle { - type Target = mpsc::Sender<WriteControl>; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for WriteControlHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -impl WriteControlHandle { - pub fn new( - stream: BoundJabberWriter<Tls>, - on_crash: oneshot::Sender<(WriteMessage, WriteState)>, - ) -> (WriteHandle, Self) { - let (control_sender, control_receiver) = mpsc::channel(20); - let (stanza_sender, stanza_receiver) = mpsc::channel(20); - - let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run().await }); - - ( - WriteHandle { - sender: stanza_sender, - }, - Self { - sender: control_sender, - handle, - }, - ) - } - - pub fn reconnect_retry( - stream: BoundJabberWriter<Tls>, - on_crash: oneshot::Sender<(WriteMessage, WriteState)>, - stanza_receiver: mpsc::Receiver<WriteMessage>, - retry_msg: WriteMessage, - ) -> Self { - let (control_sender, control_receiver) = mpsc::channel(20); - - let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); - - Self { - sender: control_sender, - handle, - } - } - - pub fn reconnect( - stream: BoundJabberWriter<Tls>, - on_crash: oneshot::Sender<(WriteMessage, WriteState)>, - stanza_receiver: mpsc::Receiver<WriteMessage>, - ) -> Self { - let (control_sender, control_receiver) = mpsc::channel(20); - - let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); - let handle = tokio::spawn(async move { actor.run().await }); - - Self { - sender: control_sender, - handle, - } - } -} |