diff options
author | 2025-03-26 19:13:10 +0000 | |
---|---|---|
committer | 2025-03-26 19:13:10 +0000 | |
commit | 8c239e5c7a49cff350104b09cbb74d862c2ec420 (patch) | |
tree | 4b392f1ffa6b91fadf68b4a7f67ad5f901fbeda4 | |
parent | 410fe3af16be5985c868b00908b8ddf4ed6e469d (diff) | |
download | luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.tar.gz luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.tar.bz2 luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.zip |
feat: stream error handling
-rw-r--r-- | filamento/examples/example.rs | 2 | ||||
-rw-r--r-- | filamento/src/error.rs | 6 | ||||
-rw-r--r-- | filamento/src/lib.rs | 74 | ||||
-rw-r--r-- | filamento/src/logic/connection_error.rs | 7 | ||||
-rw-r--r-- | filamento/src/logic/mod.rs | 40 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 421 | ||||
-rw-r--r-- | lampada/src/connection/mod.rs | 52 | ||||
-rw-r--r-- | lampada/src/connection/read.rs | 60 | ||||
-rw-r--r-- | lampada/src/connection/write.rs | 14 | ||||
-rw-r--r-- | lampada/src/lib.rs | 7 | ||||
-rw-r--r-- | stanza/src/client/mod.rs | 24 | ||||
-rw-r--r-- | stanza/src/stream.rs | 4 |
12 files changed, 369 insertions, 342 deletions
diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index f2b787b..e36968d 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -7,7 +7,7 @@ use tracing::info; #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); - let db = Db::create_connect_and_migrate(Path::new("./filamentoa.db")) + let db = Db::create_connect_and_migrate(Path::new("./filamento.db")) .await .unwrap(); let (client, mut recv) = diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 996a503..c5fdb03 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -16,7 +16,7 @@ pub enum Error { // TODO: include content // UnrecognizedContent(peanuts::element::Content), #[error("iq receive error: {0}")] - Iq(IqError), + Iq(#[from] IqError), // TODO: change to Connecting(ConnectingError) #[error("connecting: {0}")] Connecting(#[from] ConnectionJobError), @@ -32,7 +32,7 @@ pub enum Error { #[error("message send error: {0}")] MessageSend(MessageSendError), #[error("message receive error: {0}")] - MessageRecv(MessageRecvError), + MessageRecv(#[from] MessageRecvError), } #[derive(Debug, Error, Clone)] @@ -86,7 +86,7 @@ pub enum RosterError { #[derive(Debug, Error, Clone)] #[error("database error: {0}")] -pub struct DatabaseError(Arc<sqlx::Error>); +pub struct DatabaseError(pub Arc<sqlx::Error>); impl From<sqlx::Error> for DatabaseError { fn from(e: sqlx::Error) -> Self { diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index 030dc43..b284c7e 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -98,6 +98,33 @@ pub enum Command { /// chatroom). if disconnected, will be cached so when client connects, message will be sent. SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>), } + +#[derive(Debug, Clone)] +pub enum UpdateMessage { + Error(Error), + Online(Online, Vec<Contact>), + Offline(Offline), + /// received roster from jabber server (replace full app roster state with this) + /// is this needed? + FullRoster(Vec<Contact>), + /// (only update app roster state, don't replace) + RosterUpdate(Contact), + RosterDelete(JID), + /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone + Presence { + from: JID, + presence: Presence, + }, + // TODO: receipts + // MessageDispatched(Uuid), + Message { + to: JID, + message: Message, + }, + SubscriptionRequest(jid::JID), + Unsupported(Stanza), +} + /// an xmpp client that is suited for a chat client use case #[derive(Debug)] pub struct Client { @@ -147,20 +174,24 @@ impl Client { let (_sup_send, sup_recv) = oneshot::channel(); let sup_recv = sup_recv.fuse(); - let logic = ClientLogic::new(db, Arc::new(Mutex::new(HashMap::new())), update_send); + let client = Self { + sender: command_sender, + // TODO: configure timeout + timeout: Duration::from_secs(10), + }; + + let logic = ClientLogic::new( + client.clone(), + db, + Arc::new(Mutex::new(HashMap::new())), + update_send, + ); let actor: CoreClient<ClientLogic> = CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); tokio::spawn(async move { actor.run().await }); - ( - Self { - sender: command_sender, - // TODO: configure timeout - timeout: Duration::from_secs(10), - }, - update_recv, - ) + (client, update_recv) } pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { @@ -453,28 +484,3 @@ impl From<Command> for CoreClientCommand<Command> { CoreClientCommand::Command(value) } } - -#[derive(Debug, Clone)] -pub enum UpdateMessage { - Error(Error), - Online(Online, Vec<Contact>), - Offline(Offline), - /// received roster from jabber server (replace full app roster state with this) - /// is this needed? - FullRoster(Vec<Contact>), - /// (only update app roster state, don't replace) - RosterUpdate(Contact), - RosterDelete(JID), - /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone - Presence { - from: JID, - presence: Presence, - }, - // TODO: receipts - // MessageDispatched(Uuid), - Message { - to: JID, - message: Message, - }, - SubscriptionRequest(jid::JID), -} diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs index ac9e931..081900b 100644 --- a/filamento/src/logic/connection_error.rs +++ b/filamento/src/logic/connection_error.rs @@ -1,12 +1,7 @@ use lampada::error::ConnectionError; -use crate::UpdateMessage; - use super::ClientLogic; pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { - logic - .update_sender() - .send(UpdateMessage::Error(error.into())) - .await; + logic.handle_error(error.into()).await; } diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 638f682..365a0df 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -3,8 +3,9 @@ use std::{collections::HashMap, sync::Arc}; use lampada::{Logic, error::ReadError}; use stanza::client::Stanza; use tokio::sync::{Mutex, mpsc, oneshot}; +use tracing::{error, info, warn}; -use crate::{Command, UpdateMessage, db::Db}; +use crate::{Client, Command, UpdateMessage, db::Db, error::Error}; mod abort; mod connect; @@ -16,6 +17,7 @@ mod process_stanza; #[derive(Clone)] pub struct ClientLogic { + client: Client, db: Db, pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, update_sender: mpsc::Sender<UpdateMessage>, @@ -23,6 +25,7 @@ pub struct ClientLogic { impl ClientLogic { pub fn new( + client: Client, db: Db, pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, update_sender: mpsc::Sender<UpdateMessage>, @@ -31,9 +34,14 @@ impl ClientLogic { db, pending, update_sender, + client, } } + pub fn client(&self) -> &Client { + &self.client + } + pub fn db(&self) -> &Db { &self.db } @@ -45,6 +53,23 @@ impl ClientLogic { pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { &self.update_sender } + + pub async fn handle_unsupported(&self, stanza: impl Into<Stanza>) { + let stanza: Stanza = stanza.into(); + warn!("received unsupported stanza: {:?}", stanza); + self.handle_update(UpdateMessage::Unsupported(stanza)).await; + } + + pub async fn handle_update(&self, update: UpdateMessage) { + // TODO: impl fmt + info!("{:?}", update); + self.update_sender().send(update).await; + } + + pub async fn handle_error(&self, e: Error) { + error!("{}", e); + self.handle_update(UpdateMessage::Error(e)).await; + } } impl Logic for ClientLogic { @@ -63,13 +88,8 @@ impl Logic for ClientLogic { disconnect::handle_disconnect(self, connection).await; } - async fn handle_stanza( - self, - stanza: ::stanza::client::Stanza, - connection: lampada::Connected, - supervisor: lampada::SupervisorSender, - ) { - process_stanza::handle_stanza(self, stanza, connection, supervisor).await; + async fn handle_stanza(self, stanza: ::stanza::client::Stanza, connection: lampada::Connected) { + process_stanza::handle_stanza(self, stanza, connection).await; } async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) { @@ -87,4 +107,8 @@ impl Logic for ClientLogic { async fn handle_connection_error(self, error: lampada::error::ConnectionError) { connection_error::handle_connection_error(self, error).await; } + + async fn handle_stream_error(self, stream_error: stanza::stream::Error) { + self.handle_error(Error::Stream(stream_error)).await; + } } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 17738df..1a68936 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -2,263 +2,238 @@ use std::str::FromStr; use chrono::Utc; use lampada::{Connected, SupervisorSender}; -use stanza::client::Stanza; +use stanza::client::{Stanza, iq::Iq}; use uuid::Uuid; use crate::{ UpdateMessage, chat::{Body, Message}, - error::{Error, IqError, MessageRecvError, PresenceError, RosterError}, + error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError}, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, }; use super::ClientLogic; -pub async fn handle_stanza( +pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { + let result = process_stanza(logic.clone(), stanza, connection).await; + match result { + Ok(u) => match u { + Some(UpdateMessage::Unsupported(stanza)) => logic.handle_unsupported(stanza).await, + _ => { + if let Some(u) = u { + logic.handle_update(u).await + } + } + }, + Err(e) => logic.handle_error(e).await, + } +} + +pub async fn recv_message( logic: ClientLogic, - stanza: Stanza, - connection: Connected, - supervisor: SupervisorSender, -) { - match stanza { - Stanza::Message(stanza_message) => { - if let Some(mut from) = stanza_message.from { - // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. - let timestamp = stanza_message + stanza_message: stanza::client::message::Message, +) -> Result<Option<UpdateMessage>, MessageRecvError> { + if let Some(mut from) = stanza_message.from { + // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. + let timestamp = stanza_message + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + // TODO: group chat messages + let mut message = Message { + id: stanza_message + .id + // TODO: proper id storage + .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) + .unwrap_or_else(|| Uuid::new_v4()), + from: from.clone(), + timestamp, + body: Body { + // TODO: should this be an option? + body: stanza_message + .body + .map(|body| body.body) + .unwrap_or_default() + .unwrap_or_default(), + }, + }; + // TODO: can this be more efficient? + logic + .db() + .create_message_with_user_resource_and_chat(message.clone(), from.clone()) + .await + .map_err(|e| DatabaseError(e.into()))?; + message.from = message.from.as_bare(); + from = from.as_bare(); + Ok(Some(UpdateMessage::Message { to: from, message })) + } else { + Err(MessageRecvError::MissingFrom) + } +} + +pub async fn recv_presence( + presence: stanza::client::presence::Presence, +) -> Result<Option<UpdateMessage>, PresenceError> { + if let Some(from) = presence.from { + match presence.r#type { + Some(r#type) => match r#type { + // error processing a presence from somebody + stanza::client::presence::PresenceType::Error => { + // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. + // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display + Err(PresenceError::StanzaError( + presence + .errors + .first() + .cloned() + .expect("error MUST have error"), + )) + } + // should not happen (error to server) + stanza::client::presence::PresenceType::Probe => { + // TODO: should probably write an error and restart stream + Err(PresenceError::Unsupported) + } + stanza::client::presence::PresenceType::Subscribe => { + // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event + Ok(Some(UpdateMessage::SubscriptionRequest(from))) + } + stanza::client::presence::PresenceType::Unavailable => { + let offline = Offline { + status: presence.status.map(|status| status.status.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + Ok(Some(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Offline(offline), + }, + })) + } + // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. + stanza::client::presence::PresenceType::Subscribed => Ok(None), + stanza::client::presence::PresenceType::Unsubscribe => Ok(None), + stanza::client::presence::PresenceType::Unsubscribed => Ok(None), + }, + None => { + let online = Online { + show: presence.show.map(|show| match show { + stanza::client::presence::Show::Away => Show::Away, + stanza::client::presence::Show::Chat => Show::Chat, + stanza::client::presence::Show::Dnd => Show::DoNotDisturb, + stanza::client::presence::Show::Xa => Show::ExtendedAway, + }), + status: presence.status.map(|status| status.status.0), + priority: presence.priority.map(|priority| priority.0), + }; + let timestamp = presence .delay .map(|delay| delay.stamp) .unwrap_or_else(|| Utc::now()); - // TODO: group chat messages - let mut message = Message { - id: stanza_message - .id - // TODO: proper id storage - .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) - .unwrap_or_else(|| Uuid::new_v4()), - from: from.clone(), - timestamp, - body: Body { - // TODO: should this be an option? - body: stanza_message - .body - .map(|body| body.body) - .unwrap_or_default() - .unwrap_or_default(), + Ok(Some(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Online(online), }, - }; - // TODO: can this be more efficient? - let result = logic - .db() - .create_message_with_user_resource_and_chat(message.clone(), from.clone()) - .await; - if let Err(e) = result { - tracing::error!("messagecreate"); - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MessageHistory(e.into()), - ))) - .await; - } - message.from = message.from.as_bare(); - from = from.as_bare(); - let _ = logic - .update_sender() - .send(UpdateMessage::Message { to: from, message }) - .await; - } else { - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MissingFrom, - ))) - .await; + })) } } - Stanza::Presence(presence) => { - if let Some(from) = presence.from { - match presence.r#type { - Some(r#type) => match r#type { - // error processing a presence from somebody - stanza::client::presence::PresenceType::Error => { - // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::Presence( - // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display - PresenceError::StanzaError( - presence - .errors - .first() - .cloned() - .expect("error MUST have error"), - ), - ))) - .await; - } - // should not happen (error to server) - stanza::client::presence::PresenceType::Probe => { - // TODO: should probably write an error and restart stream - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::Presence( - PresenceError::Unsupported, - ))) - .await; - } - stanza::client::presence::PresenceType::Subscribe => { - // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event - let _ = logic - .update_sender() - .send(UpdateMessage::SubscriptionRequest(from)) - .await; - } - stanza::client::presence::PresenceType::Unavailable => { - let offline = Offline { - status: presence.status.map(|status| status.status.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = logic - .update_sender() - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Offline(offline), - }, - }) - .await; - } - // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. - stanza::client::presence::PresenceType::Subscribed => {} - stanza::client::presence::PresenceType::Unsubscribe => {} - stanza::client::presence::PresenceType::Unsubscribed => {} - }, - None => { - let online = Online { - show: presence.show.map(|show| match show { - stanza::client::presence::Show::Away => Show::Away, - stanza::client::presence::Show::Chat => Show::Chat, - stanza::client::presence::Show::Dnd => Show::DoNotDisturb, - stanza::client::presence::Show::Xa => Show::ExtendedAway, - }), - status: presence.status.map(|status| status.status.0), - priority: presence.priority.map(|priority| priority.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = logic - .update_sender() - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Online(online), - }, - }) - .await; - } - } + } else { + Err(PresenceError::MissingFrom) + } +} + +pub async fn recv_iq(logic: ClientLogic, iq: Iq) -> Result<Option<UpdateMessage>, IqError> { + match iq.r#type { + stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { + let send; + { + send = logic.pending().lock().await.remove(&iq.id); + } + if let Some(send) = send { + send.send(Ok(Stanza::Iq(iq))); + Ok(None) } else { - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::Presence( - PresenceError::MissingFrom, - ))) - .await; + Err(IqError::NoMatchingId(iq.id)) } } - Stanza::Iq(iq) => match iq.r#type { - stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { - let send; - { - send = logic.pending().lock().await.remove(&iq.id); - } - if let Some(send) = send { - send.send(Ok(Stanza::Iq(iq))); - } else { - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( - iq.id, - )))) - .await; - } - } - // TODO: send unsupported to server - // TODO: proper errors i am so tired please - stanza::client::iq::IqType::Get => {} - stanza::client::iq::IqType::Set => { - if let Some(query) = iq.query { - match query { - stanza::client::iq::Query::Roster(mut query) => { - // TODO: there should only be one - if let Some(item) = query.items.pop() { - match item.subscription { - Some(stanza::roster::Subscription::Remove) => { - logic.db().delete_contact(item.jid.clone()).await; - logic - .update_sender() - .send(UpdateMessage::RosterDelete(item.jid)) - .await; - // TODO: send result - } - _ => { - let contact: Contact = item.into(); - if let Err(e) = - logic.db().upsert_contact(contact.clone()).await - { - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::Roster( - RosterError::Cache(e.into()), - ))) - .await; - } + // TODO: send unsupported to server + // TODO: proper errors i am so tired please + stanza::client::iq::IqType::Get => Ok(None), + stanza::client::iq::IqType::Set => { + if let Some(query) = iq.query { + match query { + stanza::client::iq::Query::Roster(mut query) => { + // TODO: there should only be one + if let Some(item) = query.items.pop() { + match item.subscription { + Some(stanza::roster::Subscription::Remove) => { + logic.db().delete_contact(item.jid.clone()).await; + Ok(Some(UpdateMessage::RosterDelete(item.jid))) + } + _ => { + let contact: Contact = item.into(); + if let Err(e) = logic.db().upsert_contact(contact.clone()).await + { let _ = logic .update_sender() - .send(UpdateMessage::RosterUpdate(contact)) + .send(UpdateMessage::Error(Error::Roster( + RosterError::Cache(e.into()), + ))) .await; - // TODO: send result - // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { - // from: , - // id: todo!(), - // to: todo!(), - // r#type: todo!(), - // lang: todo!(), - // query: todo!(), - // errors: todo!(), - // })); } + Ok(Some(UpdateMessage::RosterUpdate(contact))) + // TODO: send result + // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { + // from: , + // id: todo!(), + // to: todo!(), + // r#type: todo!(), + // lang: todo!(), + // query: todo!(), + // errors: todo!(), + // })); } } + } else { + Ok(None) } - // TODO: send unsupported to server - _ => {} } - } else { - // TODO: send error (unsupported) to server + // TODO: send unsupported to server + _ => Ok(None), } + } else { + // TODO: send error (unsupported) to server + Ok(None) } - }, + } + } +} + +pub async fn process_stanza( + logic: ClientLogic, + stanza: Stanza, + connection: Connected, +) -> Result<Option<UpdateMessage>, Error> { + let update = match stanza { + Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), + Stanza::Presence(presence) => Ok(recv_presence(presence).await?), + Stanza::Iq(iq) => Ok(recv_iq(logic, iq).await?), + // unreachable, always caught by lampada + // TODO: make cleaner Stanza::Error(error) => { - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::Stream(error))) - .await; - // TODO: reconnect + unreachable!() } Stanza::OtherContent(content) => { - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::UnrecognizedContent)); + Err(Error::UnrecognizedContent) // TODO: send error to write_thread } - } + }; + update } diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index 1e767b0..ffaa7a7 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -10,7 +10,7 @@ use std::{ use jid::JID; use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; use read::{ReadControl, ReadControlHandle, ReadState}; -use stanza::client::Stanza; +use stanza::{client::Stanza, stream_error::Error as StreamError}; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, @@ -28,7 +28,7 @@ pub(crate) mod write; pub struct Supervisor<Lgc> { command_recv: mpsc::Receiver<SupervisorCommand>, - reader_crash: oneshot::Receiver<ReadState>, + reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>, writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, read_control_handle: ReadControlHandle, write_control_handle: WriteControlHandle, @@ -43,18 +43,13 @@ pub enum SupervisorCommand { Disconnect, // for if there was a stream error, require to reconnect // couldn't stream errors just cause a crash? lol - Reconnect(ChildState), -} - -pub enum ChildState { - Write(WriteState), - Read(ReadState), + Reconnect(ReadState), } impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { fn new( command_recv: mpsc::Receiver<SupervisorCommand>, - reader_crash: oneshot::Receiver<ReadState>, + reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>, writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>, read_control_handle: ReadControlHandle, write_control_handle: WriteControlHandle, @@ -104,33 +99,19 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { break; }, // TODO: Reconnect without aborting, gentle reconnect. + // the server sent a stream error SupervisorCommand::Reconnect(state) => { // TODO: please omfg // send abort to read stream, as already done, consider let (read_state, mut write_state); - match state { - ChildState::Write(receiver) => { - write_state = receiver; - let (send, recv) = oneshot::channel(); - let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; - // TODO: need a tokio select, in case the state arrives from somewhere else - if let Ok(state) = recv.await { - read_state = state; - } else { - break - } - }, - ChildState::Read(read) => { - read_state = read; - let (send, recv) = oneshot::channel(); - let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; - // TODO: need a tokio select, in case the state arrives from somewhere else - if let Ok(state) = recv.await { - write_state = state; - } else { - break - } - }, + read_state = state; + let (send, recv) = oneshot::channel(); + let _ = self.write_control_handle.send(WriteControl::Abort(None, send)).await; + // TODO: need a tokio select, in case the state arrives from somewhere else + if let Ok(state) = recv.await { + write_state = state; + } else { + break } let mut jid = self.connected.jid.clone(); @@ -175,7 +156,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; let read_state = tokio::select! { Ok(s) = recv => s, - Ok(s) = &mut self.reader_crash => s, + // TODO: is this okay + Ok(s) = &mut self.reader_crash => s.1, // in case, just break as irrecoverable else => break, }; @@ -215,9 +197,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> { }, } }, - Ok(read_state) = &mut self.reader_crash => { + Ok((stream_error, read_state)) = &mut self.reader_crash => { let (send, recv) = oneshot::channel(); - let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; + let _ = self.write_control_handle.send(WriteControl::Abort(stream_error, send)).await; let (retry_msg, mut write_state) = tokio::select! { Ok(s) = recv => (None, s), Ok(s) = &mut self.writer_crash => (Some(s.0), s.1), diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index cc69387..640ca8e 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -9,13 +9,15 @@ use std::{ use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; use stanza::client::Stanza; +use stanza::stream::Error as StreamErrorStanza; +use stanza::stream_error::Error as StreamError; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::{JoinHandle, JoinSet}, }; use tracing::info; -use crate::{Connected, Logic}; +use crate::{Connected, Logic, WriteMessage}; use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; @@ -36,7 +38,7 @@ pub struct Read<Lgc> { // control stuff control_receiver: mpsc::Receiver<ReadControl>, - on_crash: oneshot::Sender<ReadState>, + on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, } /// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue @@ -54,7 +56,7 @@ impl<Lgc> Read<Lgc> { logic: Lgc, supervisor_control: SupervisorSender, control_receiver: mpsc::Receiver<ReadControl>, - on_crash: oneshot::Sender<ReadState>, + on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, ) -> Self { let (_send, recv) = oneshot::channel(); Self { @@ -106,34 +108,40 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> { println!("read stanza"); match s { Ok(s) => { - self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); + match s { + Stanza::Error(error) => { + self.logic.clone().handle_stream_error(error).await; + self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await; + break; + }, + _ => { + self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); + } + }; }, Err(e) => { println!("error: {:?}", e); - // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true - // match e { - // peanuts::Error::ReadError(error) => todo!(), - // peanuts::Error::Utf8Error(utf8_error) => todo!(), - // peanuts::Error::ParseError(_) => todo!(), - // peanuts::Error::EntityProcessError(_) => todo!(), - // peanuts::Error::InvalidCharRef(_) => todo!(), - // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), - // peanuts::Error::DuplicateAttribute(_) => todo!(), - // peanuts::Error::UnqualifiedNamespace(_) => todo!(), - // peanuts::Error::MismatchedEndTag(name, name1) => todo!(), - // peanuts::Error::NotInElement(_) => todo!(), - // peanuts::Error::ExtraData(_) => todo!(), - // peanuts::Error::UndeclaredNamespace(_) => todo!(), - // peanuts::Error::IncorrectName(name) => todo!(), - // peanuts::Error::DeserializeError(_) => todo!(), - // peanuts::Error::Deserialize(deserialize_error) => todo!(), - // peanuts::Error::RootElementEnded => todo!(), - // } // TODO: make sure this only happens when an end tag is received if self.disconnecting == true { break; } else { - let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); + let stream_error = match e { + peanuts::Error::ReadError(error) => None, + peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding), + peanuts::Error::ParseError(_) => Some(StreamError::BadFormat), + peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml), + peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding), + peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed), + peanuts::Error::DuplicateAttribute(_) => Some(StreamError::NotWellFormed), + peanuts::Error::MismatchedEndTag(name, name1) => Some(StreamError::NotWellFormed), + peanuts::Error::NotInElement(_) => Some(StreamError::InvalidXml), + peanuts::Error::ExtraData(_) => None, + peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace), + peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml), + peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml), + }; + + let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); } break; }, @@ -183,7 +191,7 @@ impl ReadControlHandle { connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, - on_crash: oneshot::Sender<ReadState>, + on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); @@ -210,7 +218,7 @@ impl ReadControlHandle { connected: Connected, logic: Lgc, supervisor_control: SupervisorSender, - on_crash: oneshot::Sender<ReadState>, + on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs index 8f0c34b..1070cdf 100644 --- a/lampada/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -1,7 +1,9 @@ use std::ops::{Deref, DerefMut}; use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; -use stanza::client::Stanza; +use stanza::{ + client::Stanza, stream::Error as StreamErrorStanza, stream_error::Error as StreamError, +}; use tokio::{ sync::{mpsc, oneshot}, task::JoinHandle, @@ -34,7 +36,7 @@ pub struct WriteMessage { pub enum WriteControl { Disconnect, - Abort(oneshot::Sender<WriteState>), + Abort(Option<StreamError>, oneshot::Sender<WriteState>), } impl Write { @@ -119,7 +121,13 @@ impl Write { break; }, // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource - WriteControl::Abort(sender) => { + WriteControl::Abort(error, sender) => { + // write stream error message for server if there is one + if let Some(error) = error { + // TODO: timeouts for writing to stream + let _ = self.stream.write(&Stanza::Error(StreamErrorStanza { error, text: None })).await; + // don't care about result, if it sends it sends, otherwise stream is restarting anyway + } let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver }); break; }, diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs index c61c596..a01ba06 100644 --- a/lampada/src/lib.rs +++ b/lampada/src/lib.rs @@ -15,6 +15,7 @@ use stanza::client::{ iq::{self, Iq, IqType}, Stanza, }; +use stanza::stream::Error as StreamError; use tokio::{ sync::{mpsc, oneshot, Mutex}, task::JoinSet, @@ -59,12 +60,16 @@ pub trait Logic { connection: Connected, ) -> impl std::future::Future<Output = ()> + Send; + fn handle_stream_error( + self, + stream_error: StreamError, + ) -> impl std::future::Future<Output = ()> + Send; + /// run to handle an incoming xmpp stanza fn handle_stanza( self, stanza: Stanza, connection: Connected, - supervisor: SupervisorSender, ) -> impl std::future::Future<Output = ()> + std::marker::Send; /// run to handle a command message when a connection is currently established diff --git a/stanza/src/client/mod.rs b/stanza/src/client/mod.rs index 3e690a7..a1b2de5 100644 --- a/stanza/src/client/mod.rs +++ b/stanza/src/client/mod.rs @@ -61,3 +61,27 @@ impl IntoContent for Stanza { } } } + +impl From<Message> for Stanza { + fn from(value: Message) -> Self { + Stanza::Message(value) + } +} + +impl From<Presence> for Stanza { + fn from(value: Presence) -> Self { + Stanza::Presence(value) + } +} + +impl From<Iq> for Stanza { + fn from(value: Iq) -> Self { + Stanza::Iq(value) + } +} + +impl From<StreamError> for Stanza { + fn from(value: StreamError) -> Self { + Stanza::Error(value) + } +} diff --git a/stanza/src/stream.rs b/stanza/src/stream.rs index 8e1982f..732a826 100644 --- a/stanza/src/stream.rs +++ b/stanza/src/stream.rs @@ -181,8 +181,8 @@ impl FromElement for Feature { #[derive(Error, Debug, Clone)] pub struct Error { - error: StreamError, - text: Option<Text>, + pub error: StreamError, + pub text: Option<Text>, } impl Display for Error { |