diff options
| author | 2025-03-26 19:13:10 +0000 | |
|---|---|---|
| committer | 2025-03-26 19:13:10 +0000 | |
| commit | 8c239e5c7a49cff350104b09cbb74d862c2ec420 (patch) | |
| tree | 4b392f1ffa6b91fadf68b4a7f67ad5f901fbeda4 | |
| parent | 410fe3af16be5985c868b00908b8ddf4ed6e469d (diff) | |
| download | luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.tar.gz luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.tar.bz2 luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.zip | |
feat: stream error handling
| -rw-r--r-- | filamento/examples/example.rs | 2 | ||||
| -rw-r--r-- | filamento/src/error.rs | 6 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 74 | ||||
| -rw-r--r-- | filamento/src/logic/connection_error.rs | 7 | ||||
| -rw-r--r-- | filamento/src/logic/mod.rs | 40 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 421 | ||||
| -rw-r--r-- | lampada/src/connection/mod.rs | 52 | ||||
| -rw-r--r-- | lampada/src/connection/read.rs | 60 | ||||
| -rw-r--r-- | lampada/src/connection/write.rs | 14 | ||||
| -rw-r--r-- | lampada/src/lib.rs | 7 | ||||
| -rw-r--r-- | stanza/src/client/mod.rs | 24 | ||||
| -rw-r--r-- | stanza/src/stream.rs | 4 | 
12 files changed, 369 insertions, 342 deletions
| diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index f2b787b..e36968d 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -7,7 +7,7 @@ use tracing::info;  #[tokio::main]  async fn main() {      tracing_subscriber::fmt::init(); -    let db = Db::create_connect_and_migrate(Path::new("./filamentoa.db")) +    let db = Db::create_connect_and_migrate(Path::new("./filamento.db"))          .await          .unwrap();      let (client, mut recv) = diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 996a503..c5fdb03 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -16,7 +16,7 @@ pub enum Error {      // TODO: include content      // UnrecognizedContent(peanuts::element::Content),      #[error("iq receive error: {0}")] -    Iq(IqError), +    Iq(#[from] IqError),      // TODO: change to Connecting(ConnectingError)      #[error("connecting: {0}")]      Connecting(#[from] ConnectionJobError), @@ -32,7 +32,7 @@ pub enum Error {      #[error("message send error: {0}")]      MessageSend(MessageSendError),      #[error("message receive error: {0}")] -    MessageRecv(MessageRecvError), +    MessageRecv(#[from] MessageRecvError),  }  #[derive(Debug, Error, Clone)] @@ -86,7 +86,7 @@ pub enum RosterError {  #[derive(Debug, Error, Clone)]  #[error("database error: {0}")] -pub struct DatabaseError(Arc<sqlx::Error>); +pub struct DatabaseError(pub Arc<sqlx::Error>);  impl From<sqlx::Error> for DatabaseError {      fn from(e: sqlx::Error) -> Self { diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index 030dc43..b284c7e 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -98,6 +98,33 @@ pub enum Command {      /// chatroom). if disconnected, will be cached so when client connects, message will be sent.      SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),  } + +#[derive(Debug, Clone)] +pub enum UpdateMessage { +    Error(Error), +    Online(Online, Vec<Contact>), +    Offline(Offline), +    /// received roster from jabber server (replace full app roster state with this) +    /// is this needed? +    FullRoster(Vec<Contact>), +    /// (only update app roster state, don't replace) +    RosterUpdate(Contact), +    RosterDelete(JID), +    /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone +    Presence { +        from: JID, +        presence: Presence, +    }, +    // TODO: receipts +    // MessageDispatched(Uuid), +    Message { +        to: JID, +        message: Message, +    }, +    SubscriptionRequest(jid::JID), +    Unsupported(Stanza), +} +  /// an xmpp client that is suited for a chat client use case  #[derive(Debug)]  pub struct Client { @@ -147,20 +174,24 @@ impl Client {          let (_sup_send, sup_recv) = oneshot::channel();          let sup_recv = sup_recv.fuse(); -        let logic = ClientLogic::new(db, Arc::new(Mutex::new(HashMap::new())), update_send); +        let client = Self { +            sender: command_sender, +            // TODO: configure timeout +            timeout: Duration::from_secs(10), +        }; + +        let logic = ClientLogic::new( +            client.clone(), +            db, +            Arc::new(Mutex::new(HashMap::new())), +            update_send, +        );          let actor: CoreClient<ClientLogic> =              CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);          tokio::spawn(async move { actor.run().await }); -        ( -            Self { -                sender: command_sender, -                // TODO: configure timeout -                timeout: Duration::from_secs(10), -            }, -            update_recv, -        ) +        (client, update_recv)      }      pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> { @@ -453,28 +484,3 @@ impl From<Command> for CoreClientCommand<Command> {          CoreClientCommand::Command(value)      }  } - -#[derive(Debug, Clone)] -pub enum UpdateMessage { -    Error(Error), -    Online(Online, Vec<Contact>), -    Offline(Offline), -    /// received roster from jabber server (replace full app roster state with this) -    /// is this needed? -    FullRoster(Vec<Contact>), -    /// (only update app roster state, don't replace) -    RosterUpdate(Contact), -    RosterDelete(JID), -    /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone -    Presence { -        from: JID, -        presence: Presence, -    }, -    // TODO: receipts -    // MessageDispatched(Uuid), -    Message { -        to: JID, -        message: Message, -    }, -    SubscriptionRequest(jid::JID), -} diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs index ac9e931..081900b 100644 --- a/filamento/src/logic/connection_error.rs +++ b/filamento/src/logic/connection_error.rs @@ -1,12 +1,7 @@  use lampada::error::ConnectionError; -use crate::UpdateMessage; -  use super::ClientLogic;  pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { -    logic -        .update_sender() -        .send(UpdateMessage::Error(error.into())) -        .await; +    logic.handle_error(error.into()).await;  } diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 638f682..365a0df 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -3,8 +3,9 @@ use std::{collections::HashMap, sync::Arc};  use lampada::{Logic, error::ReadError};  use stanza::client::Stanza;  use tokio::sync::{Mutex, mpsc, oneshot}; +use tracing::{error, info, warn}; -use crate::{Command, UpdateMessage, db::Db}; +use crate::{Client, Command, UpdateMessage, db::Db, error::Error};  mod abort;  mod connect; @@ -16,6 +17,7 @@ mod process_stanza;  #[derive(Clone)]  pub struct ClientLogic { +    client: Client,      db: Db,      pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,      update_sender: mpsc::Sender<UpdateMessage>, @@ -23,6 +25,7 @@ pub struct ClientLogic {  impl ClientLogic {      pub fn new( +        client: Client,          db: Db,          pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,          update_sender: mpsc::Sender<UpdateMessage>, @@ -31,9 +34,14 @@ impl ClientLogic {              db,              pending,              update_sender, +            client,          }      } +    pub fn client(&self) -> &Client { +        &self.client +    } +      pub fn db(&self) -> &Db {          &self.db      } @@ -45,6 +53,23 @@ impl ClientLogic {      pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {          &self.update_sender      } + +    pub async fn handle_unsupported(&self, stanza: impl Into<Stanza>) { +        let stanza: Stanza = stanza.into(); +        warn!("received unsupported stanza: {:?}", stanza); +        self.handle_update(UpdateMessage::Unsupported(stanza)).await; +    } + +    pub async fn handle_update(&self, update: UpdateMessage) { +        // TODO: impl fmt +        info!("{:?}", update); +        self.update_sender().send(update).await; +    } + +    pub async fn handle_error(&self, e: Error) { +        error!("{}", e); +        self.handle_update(UpdateMessage::Error(e)).await; +    }  }  impl Logic for ClientLogic { @@ -63,13 +88,8 @@ impl Logic for ClientLogic {          disconnect::handle_disconnect(self, connection).await;      } -    async fn handle_stanza( -        self, -        stanza: ::stanza::client::Stanza, -        connection: lampada::Connected, -        supervisor: lampada::SupervisorSender, -    ) { -        process_stanza::handle_stanza(self, stanza, connection, supervisor).await; +    async fn handle_stanza(self, stanza: ::stanza::client::Stanza, connection: lampada::Connected) { +        process_stanza::handle_stanza(self, stanza, connection).await;      }      async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) { @@ -87,4 +107,8 @@ impl Logic for ClientLogic {      async fn handle_connection_error(self, error: lampada::error::ConnectionError) {          connection_error::handle_connection_error(self, error).await;      } + +    async fn handle_stream_error(self, stream_error: stanza::stream::Error) { +        self.handle_error(Error::Stream(stream_error)).await; +    }  } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 17738df..1a68936 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -2,263 +2,238 @@ use std::str::FromStr;  use chrono::Utc;  use lampada::{Connected, SupervisorSender}; -use stanza::client::Stanza; +use stanza::client::{Stanza, iq::Iq};  use uuid::Uuid;  use crate::{      UpdateMessage,      chat::{Body, Message}, -    error::{Error, IqError, MessageRecvError, PresenceError, RosterError}, +    error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError},      presence::{Offline, Online, Presence, PresenceType, Show},      roster::Contact,  };  use super::ClientLogic; -pub async fn handle_stanza( +pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +    let result = process_stanza(logic.clone(), stanza, connection).await; +    match result { +        Ok(u) => match u { +            Some(UpdateMessage::Unsupported(stanza)) => logic.handle_unsupported(stanza).await, +            _ => { +                if let Some(u) = u { +                    logic.handle_update(u).await +                } +            } +        }, +        Err(e) => logic.handle_error(e).await, +    } +} + +pub async fn recv_message(      logic: ClientLogic, -    stanza: Stanza, -    connection: Connected, -    supervisor: SupervisorSender, -) { -    match stanza { -        Stanza::Message(stanza_message) => { -            if let Some(mut from) = stanza_message.from { -                // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. -                let timestamp = stanza_message +    stanza_message: stanza::client::message::Message, +) -> Result<Option<UpdateMessage>, MessageRecvError> { +    if let Some(mut from) = stanza_message.from { +        // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. +        let timestamp = stanza_message +            .delay +            .map(|delay| delay.stamp) +            .unwrap_or_else(|| Utc::now()); +        // TODO: group chat messages +        let mut message = Message { +            id: stanza_message +                .id +                // TODO: proper id storage +                .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) +                .unwrap_or_else(|| Uuid::new_v4()), +            from: from.clone(), +            timestamp, +            body: Body { +                // TODO: should this be an option? +                body: stanza_message +                    .body +                    .map(|body| body.body) +                    .unwrap_or_default() +                    .unwrap_or_default(), +            }, +        }; +        // TODO: can this be more efficient? +        logic +            .db() +            .create_message_with_user_resource_and_chat(message.clone(), from.clone()) +            .await +            .map_err(|e| DatabaseError(e.into()))?; +        message.from = message.from.as_bare(); +        from = from.as_bare(); +        Ok(Some(UpdateMessage::Message { to: from, message })) +    } else { +        Err(MessageRecvError::MissingFrom) +    } +} + +pub async fn recv_presence( +    presence: stanza::client::presence::Presence, +) -> Result<Option<UpdateMessage>, PresenceError> { +    if let Some(from) = presence.from { +        match presence.r#type { +            Some(r#type) => match r#type { +                // error processing a presence from somebody +                stanza::client::presence::PresenceType::Error => { +                    // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. +                    // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display +                    Err(PresenceError::StanzaError( +                        presence +                            .errors +                            .first() +                            .cloned() +                            .expect("error MUST have error"), +                    )) +                } +                // should not happen (error to server) +                stanza::client::presence::PresenceType::Probe => { +                    // TODO: should probably write an error and restart stream +                    Err(PresenceError::Unsupported) +                } +                stanza::client::presence::PresenceType::Subscribe => { +                    // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event +                    Ok(Some(UpdateMessage::SubscriptionRequest(from))) +                } +                stanza::client::presence::PresenceType::Unavailable => { +                    let offline = Offline { +                        status: presence.status.map(|status| status.status.0), +                    }; +                    let timestamp = presence +                        .delay +                        .map(|delay| delay.stamp) +                        .unwrap_or_else(|| Utc::now()); +                    Ok(Some(UpdateMessage::Presence { +                        from, +                        presence: Presence { +                            timestamp, +                            presence: PresenceType::Offline(offline), +                        }, +                    })) +                } +                // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. +                stanza::client::presence::PresenceType::Subscribed => Ok(None), +                stanza::client::presence::PresenceType::Unsubscribe => Ok(None), +                stanza::client::presence::PresenceType::Unsubscribed => Ok(None), +            }, +            None => { +                let online = Online { +                    show: presence.show.map(|show| match show { +                        stanza::client::presence::Show::Away => Show::Away, +                        stanza::client::presence::Show::Chat => Show::Chat, +                        stanza::client::presence::Show::Dnd => Show::DoNotDisturb, +                        stanza::client::presence::Show::Xa => Show::ExtendedAway, +                    }), +                    status: presence.status.map(|status| status.status.0), +                    priority: presence.priority.map(|priority| priority.0), +                }; +                let timestamp = presence                      .delay                      .map(|delay| delay.stamp)                      .unwrap_or_else(|| Utc::now()); -                // TODO: group chat messages -                let mut message = Message { -                    id: stanza_message -                        .id -                        // TODO: proper id storage -                        .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) -                        .unwrap_or_else(|| Uuid::new_v4()), -                    from: from.clone(), -                    timestamp, -                    body: Body { -                        // TODO: should this be an option? -                        body: stanza_message -                            .body -                            .map(|body| body.body) -                            .unwrap_or_default() -                            .unwrap_or_default(), +                Ok(Some(UpdateMessage::Presence { +                    from, +                    presence: Presence { +                        timestamp, +                        presence: PresenceType::Online(online),                      }, -                }; -                // TODO: can this be more efficient? -                let result = logic -                    .db() -                    .create_message_with_user_resource_and_chat(message.clone(), from.clone()) -                    .await; -                if let Err(e) = result { -                    tracing::error!("messagecreate"); -                    let _ = logic -                        .update_sender() -                        .send(UpdateMessage::Error(Error::MessageRecv( -                            MessageRecvError::MessageHistory(e.into()), -                        ))) -                        .await; -                } -                message.from = message.from.as_bare(); -                from = from.as_bare(); -                let _ = logic -                    .update_sender() -                    .send(UpdateMessage::Message { to: from, message }) -                    .await; -            } else { -                let _ = logic -                    .update_sender() -                    .send(UpdateMessage::Error(Error::MessageRecv( -                        MessageRecvError::MissingFrom, -                    ))) -                    .await; +                }))              }          } -        Stanza::Presence(presence) => { -            if let Some(from) = presence.from { -                match presence.r#type { -                    Some(r#type) => match r#type { -                        // error processing a presence from somebody -                        stanza::client::presence::PresenceType::Error => { -                            // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. -                            let _ = logic -                                .update_sender() -                                .send(UpdateMessage::Error(Error::Presence( -                                    // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display -                                    PresenceError::StanzaError( -                                        presence -                                            .errors -                                            .first() -                                            .cloned() -                                            .expect("error MUST have error"), -                                    ), -                                ))) -                                .await; -                        } -                        // should not happen (error to server) -                        stanza::client::presence::PresenceType::Probe => { -                            // TODO: should probably write an error and restart stream -                            let _ = logic -                                .update_sender() -                                .send(UpdateMessage::Error(Error::Presence( -                                    PresenceError::Unsupported, -                                ))) -                                .await; -                        } -                        stanza::client::presence::PresenceType::Subscribe => { -                            // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event -                            let _ = logic -                                .update_sender() -                                .send(UpdateMessage::SubscriptionRequest(from)) -                                .await; -                        } -                        stanza::client::presence::PresenceType::Unavailable => { -                            let offline = Offline { -                                status: presence.status.map(|status| status.status.0), -                            }; -                            let timestamp = presence -                                .delay -                                .map(|delay| delay.stamp) -                                .unwrap_or_else(|| Utc::now()); -                            let _ = logic -                                .update_sender() -                                .send(UpdateMessage::Presence { -                                    from, -                                    presence: Presence { -                                        timestamp, -                                        presence: PresenceType::Offline(offline), -                                    }, -                                }) -                                .await; -                        } -                        // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. -                        stanza::client::presence::PresenceType::Subscribed => {} -                        stanza::client::presence::PresenceType::Unsubscribe => {} -                        stanza::client::presence::PresenceType::Unsubscribed => {} -                    }, -                    None => { -                        let online = Online { -                            show: presence.show.map(|show| match show { -                                stanza::client::presence::Show::Away => Show::Away, -                                stanza::client::presence::Show::Chat => Show::Chat, -                                stanza::client::presence::Show::Dnd => Show::DoNotDisturb, -                                stanza::client::presence::Show::Xa => Show::ExtendedAway, -                            }), -                            status: presence.status.map(|status| status.status.0), -                            priority: presence.priority.map(|priority| priority.0), -                        }; -                        let timestamp = presence -                            .delay -                            .map(|delay| delay.stamp) -                            .unwrap_or_else(|| Utc::now()); -                        let _ = logic -                            .update_sender() -                            .send(UpdateMessage::Presence { -                                from, -                                presence: Presence { -                                    timestamp, -                                    presence: PresenceType::Online(online), -                                }, -                            }) -                            .await; -                    } -                } +    } else { +        Err(PresenceError::MissingFrom) +    } +} + +pub async fn recv_iq(logic: ClientLogic, iq: Iq) -> Result<Option<UpdateMessage>, IqError> { +    match iq.r#type { +        stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { +            let send; +            { +                send = logic.pending().lock().await.remove(&iq.id); +            } +            if let Some(send) = send { +                send.send(Ok(Stanza::Iq(iq))); +                Ok(None)              } else { -                let _ = logic -                    .update_sender() -                    .send(UpdateMessage::Error(Error::Presence( -                        PresenceError::MissingFrom, -                    ))) -                    .await; +                Err(IqError::NoMatchingId(iq.id))              }          } -        Stanza::Iq(iq) => match iq.r#type { -            stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { -                let send; -                { -                    send = logic.pending().lock().await.remove(&iq.id); -                } -                if let Some(send) = send { -                    send.send(Ok(Stanza::Iq(iq))); -                } else { -                    let _ = logic -                        .update_sender() -                        .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( -                            iq.id, -                        )))) -                        .await; -                } -            } -            // TODO: send unsupported to server -            // TODO: proper errors i am so tired please -            stanza::client::iq::IqType::Get => {} -            stanza::client::iq::IqType::Set => { -                if let Some(query) = iq.query { -                    match query { -                        stanza::client::iq::Query::Roster(mut query) => { -                            // TODO: there should only be one -                            if let Some(item) = query.items.pop() { -                                match item.subscription { -                                    Some(stanza::roster::Subscription::Remove) => { -                                        logic.db().delete_contact(item.jid.clone()).await; -                                        logic -                                            .update_sender() -                                            .send(UpdateMessage::RosterDelete(item.jid)) -                                            .await; -                                        // TODO: send result -                                    } -                                    _ => { -                                        let contact: Contact = item.into(); -                                        if let Err(e) = -                                            logic.db().upsert_contact(contact.clone()).await -                                        { -                                            let _ = logic -                                                .update_sender() -                                                .send(UpdateMessage::Error(Error::Roster( -                                                    RosterError::Cache(e.into()), -                                                ))) -                                                .await; -                                        } +        // TODO: send unsupported to server +        // TODO: proper errors i am so tired please +        stanza::client::iq::IqType::Get => Ok(None), +        stanza::client::iq::IqType::Set => { +            if let Some(query) = iq.query { +                match query { +                    stanza::client::iq::Query::Roster(mut query) => { +                        // TODO: there should only be one +                        if let Some(item) = query.items.pop() { +                            match item.subscription { +                                Some(stanza::roster::Subscription::Remove) => { +                                    logic.db().delete_contact(item.jid.clone()).await; +                                    Ok(Some(UpdateMessage::RosterDelete(item.jid))) +                                } +                                _ => { +                                    let contact: Contact = item.into(); +                                    if let Err(e) = logic.db().upsert_contact(contact.clone()).await +                                    {                                          let _ = logic                                              .update_sender() -                                            .send(UpdateMessage::RosterUpdate(contact)) +                                            .send(UpdateMessage::Error(Error::Roster( +                                                RosterError::Cache(e.into()), +                                            )))                                              .await; -                                        // TODO: send result -                                        // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { -                                        //     from: , -                                        //     id: todo!(), -                                        //     to: todo!(), -                                        //     r#type: todo!(), -                                        //     lang: todo!(), -                                        //     query: todo!(), -                                        //     errors: todo!(), -                                        // }));                                      } +                                    Ok(Some(UpdateMessage::RosterUpdate(contact))) +                                    // TODO: send result +                                    // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { +                                    //     from: , +                                    //     id: todo!(), +                                    //     to: todo!(), +                                    //     r#type: todo!(), +                                    //     lang: todo!(), +                                    //     query: todo!(), +                                    //     errors: todo!(), +                                    // }));                                  }                              } +                        } else { +                            Ok(None)                          } -                        // TODO: send unsupported to server -                        _ => {}                      } -                } else { -                    // TODO: send error (unsupported) to server +                    // TODO: send unsupported to server +                    _ => Ok(None),                  } +            } else { +                // TODO: send error (unsupported) to server +                Ok(None)              } -        }, +        } +    } +} + +pub async fn process_stanza( +    logic: ClientLogic, +    stanza: Stanza, +    connection: Connected, +) -> Result<Option<UpdateMessage>, Error> { +    let update = match stanza { +        Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), +        Stanza::Presence(presence) => Ok(recv_presence(presence).await?), +        Stanza::Iq(iq) => Ok(recv_iq(logic, iq).await?), +        // unreachable, always caught by lampada +        // TODO: make cleaner          Stanza::Error(error) => { -            let _ = logic -                .update_sender() -                .send(UpdateMessage::Error(Error::Stream(error))) -                .await; -            // TODO: reconnect +            unreachable!()          }          Stanza::OtherContent(content) => { -            let _ = logic -                .update_sender() -                .send(UpdateMessage::Error(Error::UnrecognizedContent)); +            Err(Error::UnrecognizedContent)              // TODO: send error to write_thread          } -    } +    }; +    update  } diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index 1e767b0..ffaa7a7 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -10,7 +10,7 @@ use std::{  use jid::JID;  use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};  use read::{ReadControl, ReadControlHandle, ReadState}; -use stanza::client::Stanza; +use stanza::{client::Stanza, stream_error::Error as StreamError};  use tokio::{      sync::{mpsc, oneshot, Mutex},      task::{JoinHandle, JoinSet}, @@ -28,7 +28,7 @@ pub(crate) mod write;  pub struct Supervisor<Lgc> {      command_recv: mpsc::Receiver<SupervisorCommand>, -    reader_crash: oneshot::Receiver<ReadState>, +    reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,      writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,      read_control_handle: ReadControlHandle,      write_control_handle: WriteControlHandle, @@ -43,18 +43,13 @@ pub enum SupervisorCommand {      Disconnect,      // for if there was a stream error, require to reconnect      // couldn't stream errors just cause a crash? lol -    Reconnect(ChildState), -} - -pub enum ChildState { -    Write(WriteState), -    Read(ReadState), +    Reconnect(ReadState),  }  impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {      fn new(          command_recv: mpsc::Receiver<SupervisorCommand>, -        reader_crash: oneshot::Receiver<ReadState>, +        reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,          writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,          read_control_handle: ReadControlHandle,          write_control_handle: WriteControlHandle, @@ -104,33 +99,19 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {                              break;                          },                          // TODO: Reconnect without aborting, gentle reconnect. +                        // the server sent a stream error                          SupervisorCommand::Reconnect(state) => {                              // TODO: please omfg                              // send abort to read stream, as already done, consider                              let (read_state, mut write_state); -                            match state { -                                ChildState::Write(receiver) => { -                                    write_state = receiver; -                                    let (send, recv) = oneshot::channel(); -                                    let _ = self.read_control_handle.send(ReadControl::Abort(send)).await; -                                    // TODO: need a tokio select, in case the state arrives from somewhere else -                                    if let Ok(state) = recv.await { -                                        read_state = state; -                                    } else { -                                        break -                                    } -                                }, -                                ChildState::Read(read) => { -                                    read_state = read; -                                    let (send, recv) = oneshot::channel(); -                                    let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; -                                    // TODO: need a tokio select, in case the state arrives from somewhere else -                                    if let Ok(state) = recv.await { -                                        write_state = state; -                                    } else { -                                        break -                                    } -                                }, +                            read_state = state; +                            let (send, recv) = oneshot::channel(); +                            let _ = self.write_control_handle.send(WriteControl::Abort(None, send)).await; +                            // TODO: need a tokio select, in case the state arrives from somewhere else +                            if let Ok(state) = recv.await { +                                write_state = state; +                            } else { +                                break                              }                              let mut jid = self.connected.jid.clone(); @@ -175,7 +156,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {                      let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;                      let read_state = tokio::select! {                          Ok(s) = recv => s, -                        Ok(s) = &mut self.reader_crash => s, +                        // TODO: is this okay +                        Ok(s) = &mut self.reader_crash => s.1,                          // in case, just break as irrecoverable                          else => break,                      }; @@ -215,9 +197,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {                          },                      }                  }, -                Ok(read_state) = &mut self.reader_crash => { +                Ok((stream_error, read_state)) = &mut self.reader_crash => {                      let (send, recv) = oneshot::channel(); -                    let _ = self.write_control_handle.send(WriteControl::Abort(send)).await; +                    let _ = self.write_control_handle.send(WriteControl::Abort(stream_error, send)).await;                      let (retry_msg, mut write_state) = tokio::select! {                          Ok(s) = recv => (None, s),                          Ok(s) = &mut self.writer_crash => (Some(s.0), s.1), diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index cc69387..640ca8e 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -9,13 +9,15 @@ use std::{  use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};  use stanza::client::Stanza; +use stanza::stream::Error as StreamErrorStanza; +use stanza::stream_error::Error as StreamError;  use tokio::{      sync::{mpsc, oneshot, Mutex},      task::{JoinHandle, JoinSet},  };  use tracing::info; -use crate::{Connected, Logic}; +use crate::{Connected, Logic, WriteMessage};  use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; @@ -36,7 +38,7 @@ pub struct Read<Lgc> {      // control stuff      control_receiver: mpsc::Receiver<ReadControl>, -    on_crash: oneshot::Sender<ReadState>, +    on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,  }  /// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue @@ -54,7 +56,7 @@ impl<Lgc> Read<Lgc> {          logic: Lgc,          supervisor_control: SupervisorSender,          control_receiver: mpsc::Receiver<ReadControl>, -        on_crash: oneshot::Sender<ReadState>, +        on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,      ) -> Self {          let (_send, recv) = oneshot::channel();          Self { @@ -106,34 +108,40 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {                      println!("read stanza");                      match s {                          Ok(s) => { -                            self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); +                            match s { +                                Stanza::Error(error) => { +                                    self.logic.clone().handle_stream_error(error).await; +                                    self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await; +                                    break; +                                }, +                                _ => { +                                    self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone())); +                                } +                            };                          },                          Err(e) => {                              println!("error: {:?}", e); -                            // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true -                            // match e { -                            //     peanuts::Error::ReadError(error) => todo!(), -                            //     peanuts::Error::Utf8Error(utf8_error) => todo!(), -                            //     peanuts::Error::ParseError(_) => todo!(), -                            //     peanuts::Error::EntityProcessError(_) => todo!(), -                            //     peanuts::Error::InvalidCharRef(_) => todo!(), -                            //     peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(), -                            //     peanuts::Error::DuplicateAttribute(_) => todo!(), -                            //     peanuts::Error::UnqualifiedNamespace(_) => todo!(), -                            //     peanuts::Error::MismatchedEndTag(name, name1) => todo!(), -                            //     peanuts::Error::NotInElement(_) => todo!(), -                            //     peanuts::Error::ExtraData(_) => todo!(), -                            //     peanuts::Error::UndeclaredNamespace(_) => todo!(), -                            //     peanuts::Error::IncorrectName(name) => todo!(), -                            //     peanuts::Error::DeserializeError(_) => todo!(), -                            //     peanuts::Error::Deserialize(deserialize_error) => todo!(), -                            //     peanuts::Error::RootElementEnded => todo!(), -                            // }                              // TODO: make sure this only happens when an end tag is received                              if self.disconnecting == true {                                  break;                              } else { -                                let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); +                                let stream_error = match e { +                                    peanuts::Error::ReadError(error) => None, +                                    peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding), +                                    peanuts::Error::ParseError(_) => Some(StreamError::BadFormat), +                                    peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml), +                                    peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding), +                                    peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed), +                                    peanuts::Error::DuplicateAttribute(_) => Some(StreamError::NotWellFormed), +                                    peanuts::Error::MismatchedEndTag(name, name1) => Some(StreamError::NotWellFormed), +                                    peanuts::Error::NotInElement(_) => Some(StreamError::InvalidXml), +                                    peanuts::Error::ExtraData(_) => None, +                                    peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace), +                                    peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml), +                                    peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml), +                                }; + +                                let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }));                              }                              break;                          }, @@ -183,7 +191,7 @@ impl ReadControlHandle {          connected: Connected,          logic: Lgc,          supervisor_control: SupervisorSender, -        on_crash: oneshot::Sender<ReadState>, +        on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,      ) -> Self {          let (control_sender, control_receiver) = mpsc::channel(20); @@ -210,7 +218,7 @@ impl ReadControlHandle {          connected: Connected,          logic: Lgc,          supervisor_control: SupervisorSender, -        on_crash: oneshot::Sender<ReadState>, +        on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,      ) -> Self {          let (control_sender, control_receiver) = mpsc::channel(20); diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs index 8f0c34b..1070cdf 100644 --- a/lampada/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -1,7 +1,9 @@  use std::ops::{Deref, DerefMut};  use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; -use stanza::client::Stanza; +use stanza::{ +    client::Stanza, stream::Error as StreamErrorStanza, stream_error::Error as StreamError, +};  use tokio::{      sync::{mpsc, oneshot},      task::JoinHandle, @@ -34,7 +36,7 @@ pub struct WriteMessage {  pub enum WriteControl {      Disconnect, -    Abort(oneshot::Sender<WriteState>), +    Abort(Option<StreamError>, oneshot::Sender<WriteState>),  }  impl Write { @@ -119,7 +121,13 @@ impl Write {                              break;                          },                          // in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource -                        WriteControl::Abort(sender) => { +                        WriteControl::Abort(error, sender) => { +                            // write stream error message for server if there is one +                            if let Some(error) = error { +                                // TODO: timeouts for writing to stream +                                let _ = self.stream.write(&Stanza::Error(StreamErrorStanza { error, text: None })).await; +                                // don't care about result, if it sends it sends, otherwise stream is restarting anyway +                            }                              let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver });                              break;                          }, diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs index c61c596..a01ba06 100644 --- a/lampada/src/lib.rs +++ b/lampada/src/lib.rs @@ -15,6 +15,7 @@ use stanza::client::{      iq::{self, Iq, IqType},      Stanza,  }; +use stanza::stream::Error as StreamError;  use tokio::{      sync::{mpsc, oneshot, Mutex},      task::JoinSet, @@ -59,12 +60,16 @@ pub trait Logic {          connection: Connected,      ) -> impl std::future::Future<Output = ()> + Send; +    fn handle_stream_error( +        self, +        stream_error: StreamError, +    ) -> impl std::future::Future<Output = ()> + Send; +      /// run to handle an incoming xmpp stanza      fn handle_stanza(          self,          stanza: Stanza,          connection: Connected, -        supervisor: SupervisorSender,      ) -> impl std::future::Future<Output = ()> + std::marker::Send;      /// run to handle a command message when a connection is currently established diff --git a/stanza/src/client/mod.rs b/stanza/src/client/mod.rs index 3e690a7..a1b2de5 100644 --- a/stanza/src/client/mod.rs +++ b/stanza/src/client/mod.rs @@ -61,3 +61,27 @@ impl IntoContent for Stanza {          }      }  } + +impl From<Message> for Stanza { +    fn from(value: Message) -> Self { +        Stanza::Message(value) +    } +} + +impl From<Presence> for Stanza { +    fn from(value: Presence) -> Self { +        Stanza::Presence(value) +    } +} + +impl From<Iq> for Stanza { +    fn from(value: Iq) -> Self { +        Stanza::Iq(value) +    } +} + +impl From<StreamError> for Stanza { +    fn from(value: StreamError) -> Self { +        Stanza::Error(value) +    } +} diff --git a/stanza/src/stream.rs b/stanza/src/stream.rs index 8e1982f..732a826 100644 --- a/stanza/src/stream.rs +++ b/stanza/src/stream.rs @@ -181,8 +181,8 @@ impl FromElement for Feature {  #[derive(Error, Debug, Clone)]  pub struct Error { -    error: StreamError, -    text: Option<Text>, +    pub error: StreamError, +    pub text: Option<Text>,  }  impl Display for Error { | 
