diff options
| author | 2025-03-26 19:13:10 +0000 | |
|---|---|---|
| committer | 2025-03-26 19:13:10 +0000 | |
| commit | 8c239e5c7a49cff350104b09cbb74d862c2ec420 (patch) | |
| tree | 4b392f1ffa6b91fadf68b4a7f67ad5f901fbeda4 /filamento/src/logic | |
| parent | 410fe3af16be5985c868b00908b8ddf4ed6e469d (diff) | |
| download | luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.tar.gz luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.tar.bz2 luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.zip  | |
feat: stream error handling
Diffstat (limited to '')
| -rw-r--r-- | filamento/src/logic/connection_error.rs | 7 | ||||
| -rw-r--r-- | filamento/src/logic/mod.rs | 40 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 421 | 
3 files changed, 231 insertions, 237 deletions
diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs index ac9e931..081900b 100644 --- a/filamento/src/logic/connection_error.rs +++ b/filamento/src/logic/connection_error.rs @@ -1,12 +1,7 @@  use lampada::error::ConnectionError; -use crate::UpdateMessage; -  use super::ClientLogic;  pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { -    logic -        .update_sender() -        .send(UpdateMessage::Error(error.into())) -        .await; +    logic.handle_error(error.into()).await;  } diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 638f682..365a0df 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -3,8 +3,9 @@ use std::{collections::HashMap, sync::Arc};  use lampada::{Logic, error::ReadError};  use stanza::client::Stanza;  use tokio::sync::{Mutex, mpsc, oneshot}; +use tracing::{error, info, warn}; -use crate::{Command, UpdateMessage, db::Db}; +use crate::{Client, Command, UpdateMessage, db::Db, error::Error};  mod abort;  mod connect; @@ -16,6 +17,7 @@ mod process_stanza;  #[derive(Clone)]  pub struct ClientLogic { +    client: Client,      db: Db,      pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,      update_sender: mpsc::Sender<UpdateMessage>, @@ -23,6 +25,7 @@ pub struct ClientLogic {  impl ClientLogic {      pub fn new( +        client: Client,          db: Db,          pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,          update_sender: mpsc::Sender<UpdateMessage>, @@ -31,9 +34,14 @@ impl ClientLogic {              db,              pending,              update_sender, +            client,          }      } +    pub fn client(&self) -> &Client { +        &self.client +    } +      pub fn db(&self) -> &Db {          &self.db      } @@ -45,6 +53,23 @@ impl ClientLogic {      pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {          &self.update_sender      } + +    pub async fn handle_unsupported(&self, stanza: impl Into<Stanza>) { +        let stanza: Stanza = stanza.into(); +        warn!("received unsupported stanza: {:?}", stanza); +        self.handle_update(UpdateMessage::Unsupported(stanza)).await; +    } + +    pub async fn handle_update(&self, update: UpdateMessage) { +        // TODO: impl fmt +        info!("{:?}", update); +        self.update_sender().send(update).await; +    } + +    pub async fn handle_error(&self, e: Error) { +        error!("{}", e); +        self.handle_update(UpdateMessage::Error(e)).await; +    }  }  impl Logic for ClientLogic { @@ -63,13 +88,8 @@ impl Logic for ClientLogic {          disconnect::handle_disconnect(self, connection).await;      } -    async fn handle_stanza( -        self, -        stanza: ::stanza::client::Stanza, -        connection: lampada::Connected, -        supervisor: lampada::SupervisorSender, -    ) { -        process_stanza::handle_stanza(self, stanza, connection, supervisor).await; +    async fn handle_stanza(self, stanza: ::stanza::client::Stanza, connection: lampada::Connected) { +        process_stanza::handle_stanza(self, stanza, connection).await;      }      async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) { @@ -87,4 +107,8 @@ impl Logic for ClientLogic {      async fn handle_connection_error(self, error: lampada::error::ConnectionError) {          connection_error::handle_connection_error(self, error).await;      } + +    async fn handle_stream_error(self, stream_error: stanza::stream::Error) { +        self.handle_error(Error::Stream(stream_error)).await; +    }  } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 17738df..1a68936 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -2,263 +2,238 @@ use std::str::FromStr;  use chrono::Utc;  use lampada::{Connected, SupervisorSender}; -use stanza::client::Stanza; +use stanza::client::{Stanza, iq::Iq};  use uuid::Uuid;  use crate::{      UpdateMessage,      chat::{Body, Message}, -    error::{Error, IqError, MessageRecvError, PresenceError, RosterError}, +    error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError},      presence::{Offline, Online, Presence, PresenceType, Show},      roster::Contact,  };  use super::ClientLogic; -pub async fn handle_stanza( +pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) { +    let result = process_stanza(logic.clone(), stanza, connection).await; +    match result { +        Ok(u) => match u { +            Some(UpdateMessage::Unsupported(stanza)) => logic.handle_unsupported(stanza).await, +            _ => { +                if let Some(u) = u { +                    logic.handle_update(u).await +                } +            } +        }, +        Err(e) => logic.handle_error(e).await, +    } +} + +pub async fn recv_message(      logic: ClientLogic, -    stanza: Stanza, -    connection: Connected, -    supervisor: SupervisorSender, -) { -    match stanza { -        Stanza::Message(stanza_message) => { -            if let Some(mut from) = stanza_message.from { -                // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. -                let timestamp = stanza_message +    stanza_message: stanza::client::message::Message, +) -> Result<Option<UpdateMessage>, MessageRecvError> { +    if let Some(mut from) = stanza_message.from { +        // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. +        let timestamp = stanza_message +            .delay +            .map(|delay| delay.stamp) +            .unwrap_or_else(|| Utc::now()); +        // TODO: group chat messages +        let mut message = Message { +            id: stanza_message +                .id +                // TODO: proper id storage +                .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) +                .unwrap_or_else(|| Uuid::new_v4()), +            from: from.clone(), +            timestamp, +            body: Body { +                // TODO: should this be an option? +                body: stanza_message +                    .body +                    .map(|body| body.body) +                    .unwrap_or_default() +                    .unwrap_or_default(), +            }, +        }; +        // TODO: can this be more efficient? +        logic +            .db() +            .create_message_with_user_resource_and_chat(message.clone(), from.clone()) +            .await +            .map_err(|e| DatabaseError(e.into()))?; +        message.from = message.from.as_bare(); +        from = from.as_bare(); +        Ok(Some(UpdateMessage::Message { to: from, message })) +    } else { +        Err(MessageRecvError::MissingFrom) +    } +} + +pub async fn recv_presence( +    presence: stanza::client::presence::Presence, +) -> Result<Option<UpdateMessage>, PresenceError> { +    if let Some(from) = presence.from { +        match presence.r#type { +            Some(r#type) => match r#type { +                // error processing a presence from somebody +                stanza::client::presence::PresenceType::Error => { +                    // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. +                    // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display +                    Err(PresenceError::StanzaError( +                        presence +                            .errors +                            .first() +                            .cloned() +                            .expect("error MUST have error"), +                    )) +                } +                // should not happen (error to server) +                stanza::client::presence::PresenceType::Probe => { +                    // TODO: should probably write an error and restart stream +                    Err(PresenceError::Unsupported) +                } +                stanza::client::presence::PresenceType::Subscribe => { +                    // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event +                    Ok(Some(UpdateMessage::SubscriptionRequest(from))) +                } +                stanza::client::presence::PresenceType::Unavailable => { +                    let offline = Offline { +                        status: presence.status.map(|status| status.status.0), +                    }; +                    let timestamp = presence +                        .delay +                        .map(|delay| delay.stamp) +                        .unwrap_or_else(|| Utc::now()); +                    Ok(Some(UpdateMessage::Presence { +                        from, +                        presence: Presence { +                            timestamp, +                            presence: PresenceType::Offline(offline), +                        }, +                    })) +                } +                // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. +                stanza::client::presence::PresenceType::Subscribed => Ok(None), +                stanza::client::presence::PresenceType::Unsubscribe => Ok(None), +                stanza::client::presence::PresenceType::Unsubscribed => Ok(None), +            }, +            None => { +                let online = Online { +                    show: presence.show.map(|show| match show { +                        stanza::client::presence::Show::Away => Show::Away, +                        stanza::client::presence::Show::Chat => Show::Chat, +                        stanza::client::presence::Show::Dnd => Show::DoNotDisturb, +                        stanza::client::presence::Show::Xa => Show::ExtendedAway, +                    }), +                    status: presence.status.map(|status| status.status.0), +                    priority: presence.priority.map(|priority| priority.0), +                }; +                let timestamp = presence                      .delay                      .map(|delay| delay.stamp)                      .unwrap_or_else(|| Utc::now()); -                // TODO: group chat messages -                let mut message = Message { -                    id: stanza_message -                        .id -                        // TODO: proper id storage -                        .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) -                        .unwrap_or_else(|| Uuid::new_v4()), -                    from: from.clone(), -                    timestamp, -                    body: Body { -                        // TODO: should this be an option? -                        body: stanza_message -                            .body -                            .map(|body| body.body) -                            .unwrap_or_default() -                            .unwrap_or_default(), +                Ok(Some(UpdateMessage::Presence { +                    from, +                    presence: Presence { +                        timestamp, +                        presence: PresenceType::Online(online),                      }, -                }; -                // TODO: can this be more efficient? -                let result = logic -                    .db() -                    .create_message_with_user_resource_and_chat(message.clone(), from.clone()) -                    .await; -                if let Err(e) = result { -                    tracing::error!("messagecreate"); -                    let _ = logic -                        .update_sender() -                        .send(UpdateMessage::Error(Error::MessageRecv( -                            MessageRecvError::MessageHistory(e.into()), -                        ))) -                        .await; -                } -                message.from = message.from.as_bare(); -                from = from.as_bare(); -                let _ = logic -                    .update_sender() -                    .send(UpdateMessage::Message { to: from, message }) -                    .await; -            } else { -                let _ = logic -                    .update_sender() -                    .send(UpdateMessage::Error(Error::MessageRecv( -                        MessageRecvError::MissingFrom, -                    ))) -                    .await; +                }))              }          } -        Stanza::Presence(presence) => { -            if let Some(from) = presence.from { -                match presence.r#type { -                    Some(r#type) => match r#type { -                        // error processing a presence from somebody -                        stanza::client::presence::PresenceType::Error => { -                            // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. -                            let _ = logic -                                .update_sender() -                                .send(UpdateMessage::Error(Error::Presence( -                                    // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display -                                    PresenceError::StanzaError( -                                        presence -                                            .errors -                                            .first() -                                            .cloned() -                                            .expect("error MUST have error"), -                                    ), -                                ))) -                                .await; -                        } -                        // should not happen (error to server) -                        stanza::client::presence::PresenceType::Probe => { -                            // TODO: should probably write an error and restart stream -                            let _ = logic -                                .update_sender() -                                .send(UpdateMessage::Error(Error::Presence( -                                    PresenceError::Unsupported, -                                ))) -                                .await; -                        } -                        stanza::client::presence::PresenceType::Subscribe => { -                            // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event -                            let _ = logic -                                .update_sender() -                                .send(UpdateMessage::SubscriptionRequest(from)) -                                .await; -                        } -                        stanza::client::presence::PresenceType::Unavailable => { -                            let offline = Offline { -                                status: presence.status.map(|status| status.status.0), -                            }; -                            let timestamp = presence -                                .delay -                                .map(|delay| delay.stamp) -                                .unwrap_or_else(|| Utc::now()); -                            let _ = logic -                                .update_sender() -                                .send(UpdateMessage::Presence { -                                    from, -                                    presence: Presence { -                                        timestamp, -                                        presence: PresenceType::Offline(offline), -                                    }, -                                }) -                                .await; -                        } -                        // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. -                        stanza::client::presence::PresenceType::Subscribed => {} -                        stanza::client::presence::PresenceType::Unsubscribe => {} -                        stanza::client::presence::PresenceType::Unsubscribed => {} -                    }, -                    None => { -                        let online = Online { -                            show: presence.show.map(|show| match show { -                                stanza::client::presence::Show::Away => Show::Away, -                                stanza::client::presence::Show::Chat => Show::Chat, -                                stanza::client::presence::Show::Dnd => Show::DoNotDisturb, -                                stanza::client::presence::Show::Xa => Show::ExtendedAway, -                            }), -                            status: presence.status.map(|status| status.status.0), -                            priority: presence.priority.map(|priority| priority.0), -                        }; -                        let timestamp = presence -                            .delay -                            .map(|delay| delay.stamp) -                            .unwrap_or_else(|| Utc::now()); -                        let _ = logic -                            .update_sender() -                            .send(UpdateMessage::Presence { -                                from, -                                presence: Presence { -                                    timestamp, -                                    presence: PresenceType::Online(online), -                                }, -                            }) -                            .await; -                    } -                } +    } else { +        Err(PresenceError::MissingFrom) +    } +} + +pub async fn recv_iq(logic: ClientLogic, iq: Iq) -> Result<Option<UpdateMessage>, IqError> { +    match iq.r#type { +        stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { +            let send; +            { +                send = logic.pending().lock().await.remove(&iq.id); +            } +            if let Some(send) = send { +                send.send(Ok(Stanza::Iq(iq))); +                Ok(None)              } else { -                let _ = logic -                    .update_sender() -                    .send(UpdateMessage::Error(Error::Presence( -                        PresenceError::MissingFrom, -                    ))) -                    .await; +                Err(IqError::NoMatchingId(iq.id))              }          } -        Stanza::Iq(iq) => match iq.r#type { -            stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { -                let send; -                { -                    send = logic.pending().lock().await.remove(&iq.id); -                } -                if let Some(send) = send { -                    send.send(Ok(Stanza::Iq(iq))); -                } else { -                    let _ = logic -                        .update_sender() -                        .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( -                            iq.id, -                        )))) -                        .await; -                } -            } -            // TODO: send unsupported to server -            // TODO: proper errors i am so tired please -            stanza::client::iq::IqType::Get => {} -            stanza::client::iq::IqType::Set => { -                if let Some(query) = iq.query { -                    match query { -                        stanza::client::iq::Query::Roster(mut query) => { -                            // TODO: there should only be one -                            if let Some(item) = query.items.pop() { -                                match item.subscription { -                                    Some(stanza::roster::Subscription::Remove) => { -                                        logic.db().delete_contact(item.jid.clone()).await; -                                        logic -                                            .update_sender() -                                            .send(UpdateMessage::RosterDelete(item.jid)) -                                            .await; -                                        // TODO: send result -                                    } -                                    _ => { -                                        let contact: Contact = item.into(); -                                        if let Err(e) = -                                            logic.db().upsert_contact(contact.clone()).await -                                        { -                                            let _ = logic -                                                .update_sender() -                                                .send(UpdateMessage::Error(Error::Roster( -                                                    RosterError::Cache(e.into()), -                                                ))) -                                                .await; -                                        } +        // TODO: send unsupported to server +        // TODO: proper errors i am so tired please +        stanza::client::iq::IqType::Get => Ok(None), +        stanza::client::iq::IqType::Set => { +            if let Some(query) = iq.query { +                match query { +                    stanza::client::iq::Query::Roster(mut query) => { +                        // TODO: there should only be one +                        if let Some(item) = query.items.pop() { +                            match item.subscription { +                                Some(stanza::roster::Subscription::Remove) => { +                                    logic.db().delete_contact(item.jid.clone()).await; +                                    Ok(Some(UpdateMessage::RosterDelete(item.jid))) +                                } +                                _ => { +                                    let contact: Contact = item.into(); +                                    if let Err(e) = logic.db().upsert_contact(contact.clone()).await +                                    {                                          let _ = logic                                              .update_sender() -                                            .send(UpdateMessage::RosterUpdate(contact)) +                                            .send(UpdateMessage::Error(Error::Roster( +                                                RosterError::Cache(e.into()), +                                            )))                                              .await; -                                        // TODO: send result -                                        // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { -                                        //     from: , -                                        //     id: todo!(), -                                        //     to: todo!(), -                                        //     r#type: todo!(), -                                        //     lang: todo!(), -                                        //     query: todo!(), -                                        //     errors: todo!(), -                                        // }));                                      } +                                    Ok(Some(UpdateMessage::RosterUpdate(contact))) +                                    // TODO: send result +                                    // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { +                                    //     from: , +                                    //     id: todo!(), +                                    //     to: todo!(), +                                    //     r#type: todo!(), +                                    //     lang: todo!(), +                                    //     query: todo!(), +                                    //     errors: todo!(), +                                    // }));                                  }                              } +                        } else { +                            Ok(None)                          } -                        // TODO: send unsupported to server -                        _ => {}                      } -                } else { -                    // TODO: send error (unsupported) to server +                    // TODO: send unsupported to server +                    _ => Ok(None),                  } +            } else { +                // TODO: send error (unsupported) to server +                Ok(None)              } -        }, +        } +    } +} + +pub async fn process_stanza( +    logic: ClientLogic, +    stanza: Stanza, +    connection: Connected, +) -> Result<Option<UpdateMessage>, Error> { +    let update = match stanza { +        Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), +        Stanza::Presence(presence) => Ok(recv_presence(presence).await?), +        Stanza::Iq(iq) => Ok(recv_iq(logic, iq).await?), +        // unreachable, always caught by lampada +        // TODO: make cleaner          Stanza::Error(error) => { -            let _ = logic -                .update_sender() -                .send(UpdateMessage::Error(Error::Stream(error))) -                .await; -            // TODO: reconnect +            unreachable!()          }          Stanza::OtherContent(content) => { -            let _ = logic -                .update_sender() -                .send(UpdateMessage::Error(Error::UnrecognizedContent)); +            Err(Error::UnrecognizedContent)              // TODO: send error to write_thread          } -    } +    }; +    update  }  | 
