aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 14:29:40 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 14:29:40 +0000
commit2211f324782cdc617b4b5ecd071178e372539fe4 (patch)
treea5ea5ce11d748424447dee23173d3cb8aec648ea /luz/src/connection
parent2f8671978e18c1e1e7834056ae674f32fbde3868 (diff)
downloadluz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.gz
luz-2211f324782cdc617b4b5ecd071178e372539fe4.tar.bz2
luz-2211f324782cdc617b4b5ecd071178e372539fe4.zip
refactor: rename crates and move client logic to separate crate `filament`
Diffstat (limited to 'luz/src/connection')
-rw-r--r--luz/src/connection/mod.rs373
-rw-r--r--luz/src/connection/read.rs242
-rw-r--r--luz/src/connection/write.rs258
3 files changed, 0 insertions, 873 deletions
diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs
deleted file mode 100644
index 288de70..0000000
--- a/luz/src/connection/mod.rs
+++ /dev/null
@@ -1,373 +0,0 @@
-// TODO: consider if this needs to be handled by a supervisor or could be handled by luz directly
-
-use std::{
- collections::HashMap,
- ops::{Deref, DerefMut},
- sync::Arc,
- time::Duration,
-};
-
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
-use jid::JID;
-use read::{ReadControl, ReadControlHandle, ReadState};
-use stanza::client::Stanza;
-use tokio::{
- sync::{mpsc, oneshot, Mutex},
- task::{JoinHandle, JoinSet},
-};
-use tracing::info;
-use write::{WriteControl, WriteControlHandle, WriteHandle, WriteMessage, WriteState};
-
-use crate::{
- db::Db,
- error::{ConnectionError, Error, ReadError, WriteError},
- Connected, Logic, LogicState, UpdateMessage,
-};
-
-mod read;
-pub(crate) mod write;
-
-pub struct Supervisor<Lgc> {
- command_recv: mpsc::Receiver<SupervisorCommand>,
- reader_crash: oneshot::Receiver<ReadState>,
- writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
- read_control_handle: ReadControlHandle,
- write_control_handle: WriteControlHandle,
- on_crash: oneshot::Sender<()>,
- // jid in connected stays the same over the life of the supervisor (the connection session)
- connected: Connected,
- password: Arc<String>,
- logic: Lgc,
-}
-
-pub enum SupervisorCommand {
- Disconnect,
- // for if there was a stream error, require to reconnect
- // couldn't stream errors just cause a crash? lol
- Reconnect(ChildState),
-}
-
-pub enum ChildState {
- Write(WriteState),
- Read(ReadState),
-}
-
-impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
- fn new(
- command_recv: mpsc::Receiver<SupervisorCommand>,
- reader_crash: oneshot::Receiver<ReadState>,
- writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
- read_control_handle: ReadControlHandle,
- write_control_handle: WriteControlHandle,
- on_crash: oneshot::Sender<()>,
- connected: Connected,
- password: Arc<String>,
- logic: Lgc,
- ) -> Self {
- Self {
- command_recv,
- reader_crash,
- writer_crash,
- read_control_handle,
- write_control_handle,
- on_crash,
- connected,
- password,
- logic,
- }
- }
-
- async fn run(mut self) {
- loop {
- tokio::select! {
- Some(msg) = self.command_recv.recv() => {
- match msg {
- SupervisorCommand::Disconnect => {
- info!("disconnecting");
- // TODO: do handle_disconnect here
- let _ = self.write_control_handle.send(WriteControl::Disconnect).await;
- let _ = self.read_control_handle.send(ReadControl::Disconnect).await;
- info!("sent disconnect command");
- tokio::select! {
- _ = async { tokio::join!(
- async { let _ = (&mut self.write_control_handle.handle).await; },
- async { let _ = (&mut self.read_control_handle.handle).await; }
- ) } => {},
- // TODO: config timeout
- _ = async { tokio::time::sleep(Duration::from_secs(5)) } => {
- (&mut self.read_control_handle.handle).abort();
- (&mut self.write_control_handle.handle).abort();
- }
- }
- info!("disconnected");
- break;
- },
- // TODO: Reconnect without aborting, gentle reconnect.
- SupervisorCommand::Reconnect(state) => {
- // TODO: please omfg
- // send abort to read stream, as already done, consider
- let (read_state, mut write_state);
- match state {
- // TODO: proper state things for read and write thread
- ChildState::Write(receiver) => {
- write_state = receiver;
- let (send, recv) = oneshot::channel();
- let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
- if let Ok(state) = recv.await {
- read_state = state;
- } else {
- break
- }
- },
- ChildState::Read(read) => {
- read_state = read;
- let (send, recv) = oneshot::channel();
- let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
- // TODO: need a tokio select, in case the state arrives from somewhere else
- if let Ok(state) = recv.await {
- write_state = state;
- } else {
- break
- }
- },
- }
-
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
- // TODO: make sure connect_and_login does not modify the jid, but instead returns a jid. or something like that
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
- match connection {
- Ok(c) => {
- let (read, write) = c.split();
- let (send, recv) = oneshot::channel();
- self.writer_crash = recv;
- self.write_control_handle =
- WriteControlHandle::reconnect(write, send, write_state.stanza_recv);
- let (send, recv) = oneshot::channel();
- self.reader_crash = recv;
- self.read_control_handle = ReadControlHandle::reconnect(
- read,
- read_state.tasks,
- self.connected.clone(),
- self.logic.clone(),
- read_state.supervisor_control,
- send,
- );
- },
- Err(e) => {
- // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
- write_state.stanza_recv.close();
- while let Some(msg) = write_state.stanza_recv.recv().await {
- let _ = msg.respond_to.send(Err(WriteError::LostConnection));
- }
- // TODO: is this the correct error?
- self.logic.handle_connection_error(ConnectionError::LostConnection).await;
- break;
- },
- }
- },
- }
- },
- Ok((write_msg, mut write_state)) = &mut self.writer_crash => {
- // consider awaiting/aborting the read and write threads
- let (send, recv) = oneshot::channel();
- let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
- let read_state = tokio::select! {
- Ok(s) = recv => s,
- Ok(s) = &mut self.reader_crash => s,
- // in case, just break as irrecoverable
- else => break,
- };
-
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
- // TODO: same here
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
- match connection {
- Ok(c) => {
- let (read, write) = c.split();
- let (send, recv) = oneshot::channel();
- self.writer_crash = recv;
- self.write_control_handle =
- WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, write_msg);
- let (send, recv) = oneshot::channel();
- self.reader_crash = recv;
- self.read_control_handle = ReadControlHandle::reconnect(
- read,
- read_state.tasks,
- self.connected.clone(),
- self.logic.clone(),
- read_state.supervisor_control,
- send,
- );
- },
- Err(e) => {
- // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves.
- write_state.stanza_recv.close();
- let _ = write_msg.respond_to.send(Err(WriteError::LostConnection));
- while let Some(msg) = write_state.stanza_recv.recv().await {
- let _ = msg.respond_to.send(Err(WriteError::LostConnection));
- }
- // TODO: is this the correct error to send?
- self.logic.handle_connection_error(ConnectionError::LostConnection).await;
- break;
- },
- }
- },
- Ok(read_state) = &mut self.reader_crash => {
- let (send, recv) = oneshot::channel();
- let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
- let (retry_msg, mut write_state) = tokio::select! {
- Ok(s) = recv => (None, s),
- Ok(s) = &mut self.writer_crash => (Some(s.0), s.1),
- // in case, just break as irrecoverable
- else => break,
- };
-
- let mut jid = self.connected.jid.clone();
- let mut domain = jid.domainpart.clone();
- let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await;
- match connection {
- Ok(c) => {
- let (read, write) = c.split();
- let (send, recv) = oneshot::channel();
- self.writer_crash = recv;
- if let Some(msg) = retry_msg {
- self.write_control_handle =
- WriteControlHandle::reconnect_retry(write, send, write_state.stanza_recv, msg);
- } else {
- self.write_control_handle = WriteControlHandle::reconnect(write, send, write_state.stanza_recv)
- }
- let (send, recv) = oneshot::channel();
- self.reader_crash = recv;
- self.read_control_handle = ReadControlHandle::reconnect(
- read,
- read_state.tasks,
- self.connected.clone(),
- self.logic.clone(),
- read_state.supervisor_control,
- send,
- );
- },
- Err(e) => {
- // if reconnection failure, respond to all current messages with lost connection error.
- write_state.stanza_recv.close();
- if let Some(msg) = retry_msg {
- msg.respond_to.send(Err(WriteError::LostConnection));
- }
- while let Some(msg) = write_state.stanza_recv.recv().await {
- msg.respond_to.send(Err(WriteError::LostConnection));
- }
- // TODO: is this the correct error?
- self.logic.handle_connection_error(ConnectionError::LostConnection).await;
- break;
- },
- }
- },
- else => break,
- }
- }
- // TODO: maybe don't just on_crash
- let _ = self.on_crash.send(());
- }
-}
-
-pub struct SupervisorHandle {
- sender: SupervisorSender,
- handle: JoinHandle<()>,
-}
-
-impl Deref for SupervisorHandle {
- type Target = SupervisorSender;
-
- fn deref(&self) -> &Self::Target {
- &self.sender
- }
-}
-
-impl DerefMut for SupervisorHandle {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.sender
- }
-}
-
-#[derive(Clone)]
-pub struct SupervisorSender {
- sender: mpsc::Sender<SupervisorCommand>,
-}
-
-impl Deref for SupervisorSender {
- type Target = mpsc::Sender<SupervisorCommand>;
-
- fn deref(&self) -> &Self::Target {
- &self.sender
- }
-}
-
-impl DerefMut for SupervisorSender {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.sender
- }
-}
-
-impl SupervisorHandle {
- pub fn new<Lgc: Logic + Clone + Send + 'static>(
- streams: BoundJabberStream<Tls>,
- on_crash: oneshot::Sender<()>,
- jid: JID,
- password: Arc<String>,
- logic: Lgc,
- ) -> (WriteHandle, Self) {
- let (command_send, command_recv) = mpsc::channel(20);
- let (writer_crash_send, writer_crash_recv) = oneshot::channel();
- let (reader_crash_send, reader_crash_recv) = oneshot::channel();
-
- let (read_stream, write_stream) = streams.split();
-
- let (write_handle, write_control_handle) =
- WriteControlHandle::new(write_stream, writer_crash_send);
-
- let connected = Connected {
- jid,
- write_handle: write_handle.clone(),
- };
-
- let supervisor_sender = SupervisorSender {
- sender: command_send,
- };
-
- let read_control_handle = ReadControlHandle::new(
- read_stream,
- connected.clone(),
- logic.clone(),
- supervisor_sender.clone(),
- reader_crash_send,
- );
-
- let actor = Supervisor::new(
- command_recv,
- reader_crash_recv,
- writer_crash_recv,
- read_control_handle,
- write_control_handle,
- on_crash,
- connected,
- password,
- logic,
- );
-
- let handle = tokio::spawn(async move { actor.run().await });
-
- (
- write_handle,
- Self {
- sender: supervisor_sender,
- handle,
- },
- )
- }
-
- pub fn sender(&self) -> SupervisorSender {
- self.sender.clone()
- }
-}
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
deleted file mode 100644
index 4e55bc5..0000000
--- a/luz/src/connection/read.rs
+++ /dev/null
@@ -1,242 +0,0 @@
-use std::{
- collections::HashMap,
- marker::PhantomData,
- ops::{Deref, DerefMut},
- str::FromStr,
- sync::Arc,
- time::Duration,
-};
-
-use chrono::{DateTime, Utc};
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
-use stanza::client::Stanza;
-use tokio::{
- sync::{mpsc, oneshot, Mutex},
- task::{JoinHandle, JoinSet},
-};
-use tracing::info;
-use uuid::Uuid;
-
-use crate::{
- chat::{Body, Message},
- db::Db,
- error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
- presence::{Offline, Online, Presence, PresenceType, Show},
- roster::Contact,
- Connected, Logic, LogicState, UpdateMessage,
-};
-
-use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
-
-/// read actor
-pub struct Read<Lgc> {
- stream: BoundJabberReader<Tls>,
- disconnecting: bool,
- disconnect_timedout: oneshot::Receiver<()>,
-
- // all the threads spawned by the current connection session
- tasks: JoinSet<()>,
-
- // for handling incoming stanzas
- // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
- connected: Connected,
- logic: Lgc,
- supervisor_control: SupervisorSender,
-
- // control stuff
- control_receiver: mpsc::Receiver<ReadControl>,
- on_crash: oneshot::Sender<ReadState>,
-}
-
-/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
-pub struct ReadState {
- pub supervisor_control: SupervisorSender,
- // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
- pub tasks: JoinSet<()>,
-}
-
-impl<Lgc> Read<Lgc> {
- fn new(
- stream: BoundJabberReader<Tls>,
- tasks: JoinSet<()>,
- connected: Connected,
- logic: Lgc,
- supervisor_control: SupervisorSender,
- control_receiver: mpsc::Receiver<ReadControl>,
- on_crash: oneshot::Sender<ReadState>,
- ) -> Self {
- let (_send, recv) = oneshot::channel();
- Self {
- stream,
- disconnecting: false,
- disconnect_timedout: recv,
- tasks,
- connected,
- logic,
- supervisor_control,
- control_receiver,
- on_crash,
- }
- }
-}
-
-impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
- async fn run(mut self) {
- println!("started read thread");
- // let stanza = self.stream.read::<Stanza>().await;
- // println!("{:?}", stanza);
- loop {
- tokio::select! {
- // if still haven't received the end tag in time, just kill itself
- // TODO: is this okay??? what if notification thread dies?
- Ok(()) = &mut self.disconnect_timedout => {
- info!("disconnect_timedout");
- break;
- }
- Some(msg) = self.control_receiver.recv() => {
- match msg {
- // when disconnect received,
- ReadControl::Disconnect => {
- let (send, recv) = oneshot::channel();
- self.disconnect_timedout = recv;
- self.disconnecting = true;
- tokio::spawn(async {
- tokio::time::sleep(Duration::from_secs(10)).await;
- let _ = send.send(());
- })
- },
- ReadControl::Abort(sender) => {
- let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
- break;
- },
- };
- },
- s = self.stream.read::<Stanza>() => {
- println!("read stanza");
- match s {
- Ok(s) => {
- self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
- },
- Err(e) => {
- println!("error: {:?}", e);
- // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
- // match e {
- // peanuts::Error::ReadError(error) => todo!(),
- // peanuts::Error::Utf8Error(utf8_error) => todo!(),
- // peanuts::Error::ParseError(_) => todo!(),
- // peanuts::Error::EntityProcessError(_) => todo!(),
- // peanuts::Error::InvalidCharRef(_) => todo!(),
- // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
- // peanuts::Error::DuplicateAttribute(_) => todo!(),
- // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
- // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
- // peanuts::Error::NotInElement(_) => todo!(),
- // peanuts::Error::ExtraData(_) => todo!(),
- // peanuts::Error::UndeclaredNamespace(_) => todo!(),
- // peanuts::Error::IncorrectName(name) => todo!(),
- // peanuts::Error::DeserializeError(_) => todo!(),
- // peanuts::Error::Deserialize(deserialize_error) => todo!(),
- // peanuts::Error::RootElementEnded => todo!(),
- // }
- // TODO: make sure this only happens when an end tag is received
- if self.disconnecting == true {
- break;
- } else {
- let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
- }
- break;
- },
- }
- },
- else => break
- }
- }
- println!("stopping read thread");
- self.logic.on_abort().await;
- }
-}
-
-// what do stanza processes do?
-// - update ui
-// - access database
-// - disconnect proper, reconnect
-// - respond to server requests
-
-pub enum ReadControl {
- Disconnect,
- Abort(oneshot::Sender<ReadState>),
-}
-
-pub struct ReadControlHandle {
- sender: mpsc::Sender<ReadControl>,
- pub(crate) handle: JoinHandle<()>,
-}
-
-impl Deref for ReadControlHandle {
- type Target = mpsc::Sender<ReadControl>;
-
- fn deref(&self) -> &Self::Target {
- &self.sender
- }
-}
-
-impl DerefMut for ReadControlHandle {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.sender
- }
-}
-
-impl ReadControlHandle {
- pub fn new<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
- connected: Connected,
- logic: Lgc,
- supervisor_control: SupervisorSender,
- on_crash: oneshot::Sender<ReadState>,
- ) -> Self {
- let (control_sender, control_receiver) = mpsc::channel(20);
-
- let actor = Read::new(
- stream,
- JoinSet::new(),
- connected,
- logic,
- supervisor_control,
- control_receiver,
- on_crash,
- );
- let handle = tokio::spawn(async move { actor.run().await });
-
- Self {
- sender: control_sender,
- handle,
- }
- }
-
- pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
- stream: BoundJabberReader<Tls>,
- tasks: JoinSet<()>,
- connected: Connected,
- logic: Lgc,
- supervisor_control: SupervisorSender,
- on_crash: oneshot::Sender<ReadState>,
- ) -> Self {
- let (control_sender, control_receiver) = mpsc::channel(20);
-
- let actor = Read::new(
- stream,
- tasks,
- connected,
- logic,
- supervisor_control,
- control_receiver,
- on_crash,
- );
- let handle = tokio::spawn(async move { actor.run().await });
-
- Self {
- sender: control_sender,
- handle,
- }
- }
-}
diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs
deleted file mode 100644
index ff78b81..0000000
--- a/luz/src/connection/write.rs
+++ /dev/null
@@ -1,258 +0,0 @@
-use std::ops::{Deref, DerefMut};
-
-use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
-use stanza::client::Stanza;
-use tokio::{
- sync::{mpsc, oneshot},
- task::JoinHandle,
-};
-
-use crate::error::WriteError;
-
-/// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.
-pub struct Write {
- stream: BoundJabberWriter<Tls>,
-
- /// connection session write queue
- stanza_receiver: mpsc::Receiver<WriteMessage>,
-
- // control stuff
- control_receiver: mpsc::Receiver<WriteControl>,
- on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
-}
-
-/// when a crash/abort occurs, this gets sent back to the supervisor, possibly with the current write that failed, so that the connection session can continue
-pub struct WriteState {
- pub stanza_recv: mpsc::Receiver<WriteMessage>,
-}
-
-#[derive(Debug)]
-pub struct WriteMessage {
- pub stanza: Stanza,
- pub respond_to: oneshot::Sender<Result<(), WriteError>>,
-}
-
-pub enum WriteControl {
- Disconnect,
- Abort(oneshot::Sender<WriteState>),
-}
-
-impl Write {
- fn new(
- stream: BoundJabberWriter<Tls>,
- stanza_receiver: mpsc::Receiver<WriteMessage>,
- control_receiver: mpsc::Receiver<WriteControl>,
- on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
- ) -> Self {
- Self {
- stream,
- stanza_receiver,
- control_receiver,
- on_crash,
- }
- }
-
- async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> {
- Ok(self.stream.write(stanza).await?)
- }
-
- async fn run_reconnected(mut self, retry_msg: WriteMessage) {
- // try to retry sending the message that failed to send previously
- let result = self.stream.write(&retry_msg.stanza).await;
- match result {
- Err(e) => match &e {
- peanuts::Error::ReadError(_error) => {
- // make sure message is not lost from error, supervisor handles retry and reporting
- // TODO: upon reconnect, make sure we are not stuck in a reconnection loop
- let _ = self.on_crash.send((
- retry_msg,
- WriteState {
- stanza_recv: self.stanza_receiver,
- },
- ));
- return;
- }
- _ => {
- let _ = retry_msg.respond_to.send(Err(e.into()));
- }
- },
- _ => {
- let _ = retry_msg.respond_to.send(Ok(()));
- }
- }
- // return to normal loop
- self.run().await
- }
-
- async fn run(mut self) {
- loop {
- tokio::select! {
- Some(msg) = self.control_receiver.recv() => {
- match msg {
- WriteControl::Disconnect => {
- // close the stanza_receiver channel and drain out all of the remaining stanzas to send
- self.stanza_receiver.close();
- // TODO: put this in some kind of function to avoid code duplication
- while let Some(msg) = self.stanza_receiver.recv().await {
- let result = self.stream.write(&msg.stanza).await;
- match result {
- Err(e) => match &e {
- peanuts::Error::ReadError(_error) => {
- // if connection lost during disconnection, just send lost connection error to the write requests
- let _ = msg.respond_to.send(Err(WriteError::LostConnection));
- while let Some(msg) = self.stanza_receiver.recv().await {
- let _ = msg.respond_to.send(Err(WriteError::LostConnection));
- }
- break;
- }
- // otherwise complete sending all the stanzas currently in the queue
- _ => {
- let _ = msg.respond_to.send(Err(e.into()));
- }
- },
- _ => {
- let _ = msg.respond_to.send(Ok(()));
- }
- }
- }
- let _ = self.stream.try_close().await;
- break;
- },
- // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource
- WriteControl::Abort(sender) => {
- let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver });
- break;
- },
- }
- },
- Some(msg) = self.stanza_receiver.recv() => {
- let result = self.stream.write(&msg.stanza).await;
- match result {
- Err(e) => match &e {
- peanuts::Error::ReadError(_error) => {
- // make sure message is not lost from error, supervisor handles retry and reporting
- let _ = self.on_crash.send((msg, WriteState { stanza_recv: self.stanza_receiver }));
- break;
- }
- _ => {
- let _ = msg.respond_to.send(Err(e.into()));
- }
- },
- _ => {
- let _ = msg.respond_to.send(Ok(()));
- }
- }
- },
- else => break,
- }
- }
- }
-}
-
-#[derive(Clone)]
-pub struct WriteHandle {
- sender: mpsc::Sender<WriteMessage>,
-}
-
-impl WriteHandle {
- pub async fn write(&self, stanza: Stanza) -> Result<(), WriteError> {
- let (send, recv) = oneshot::channel();
- self.send(WriteMessage {
- stanza,
- respond_to: send,
- })
- .await
- .map_err(|e| WriteError::Actor(e.into()))?;
- // TODO: timeout
- recv.await.map_err(|e| WriteError::Actor(e.into()))?
- }
-}
-
-impl Deref for WriteHandle {
- type Target = mpsc::Sender<WriteMessage>;
-
- fn deref(&self) -> &Self::Target {
- &self.sender
- }
-}
-
-impl DerefMut for WriteHandle {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.sender
- }
-}
-
-pub struct WriteControlHandle {
- sender: mpsc::Sender<WriteControl>,
- pub(crate) handle: JoinHandle<()>,
-}
-
-impl Deref for WriteControlHandle {
- type Target = mpsc::Sender<WriteControl>;
-
- fn deref(&self) -> &Self::Target {
- &self.sender
- }
-}
-
-impl DerefMut for WriteControlHandle {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.sender
- }
-}
-
-impl WriteControlHandle {
- pub fn new(
- stream: BoundJabberWriter<Tls>,
- on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
- ) -> (WriteHandle, Self) {
- let (control_sender, control_receiver) = mpsc::channel(20);
- let (stanza_sender, stanza_receiver) = mpsc::channel(20);
-
- let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run().await });
-
- (
- WriteHandle {
- sender: stanza_sender,
- },
- Self {
- sender: control_sender,
- handle,
- },
- )
- }
-
- pub fn reconnect_retry(
- stream: BoundJabberWriter<Tls>,
- on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
- stanza_receiver: mpsc::Receiver<WriteMessage>,
- retry_msg: WriteMessage,
- ) -> Self {
- let (control_sender, control_receiver) = mpsc::channel(20);
-
- let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await });
-
- Self {
- sender: control_sender,
- handle,
- }
- }
-
- pub fn reconnect(
- stream: BoundJabberWriter<Tls>,
- on_crash: oneshot::Sender<(WriteMessage, WriteState)>,
- stanza_receiver: mpsc::Receiver<WriteMessage>,
- ) -> Self {
- let (control_sender, control_receiver) = mpsc::channel(20);
-
- let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash);
- let handle = tokio::spawn(async move { actor.run().await });
-
- Self {
- sender: control_sender,
- handle,
- }
- }
-}