diff options
author | 2025-03-27 14:26:27 +0000 | |
---|---|---|
committer | 2025-03-27 14:26:27 +0000 | |
commit | e703284539384b461d204c73e7e14daead3f06d9 (patch) | |
tree | 3a49a5ed8b41e78a45dce91ad7723f4d6ebab26d | |
parent | 8c239e5c7a49cff350104b09cbb74d862c2ec420 (diff) | |
download | luz-e703284539384b461d204c73e7e14daead3f06d9.tar.gz luz-e703284539384b461d204c73e7e14daead3f06d9.tar.bz2 luz-e703284539384b461d204c73e7e14daead3f06d9.zip |
feat: disco info from server
-rw-r--r-- | filamento/Cargo.toml | 2 | ||||
-rw-r--r-- | filamento/src/error.rs | 4 | ||||
-rw-r--r-- | filamento/src/logic/mod.rs | 1 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 191 | ||||
-rw-r--r-- | lampada/src/connection/mod.rs | 2 | ||||
-rw-r--r-- | lampada/src/error.rs | 2 | ||||
-rw-r--r-- | lampada/src/lib.rs | 20 | ||||
-rw-r--r-- | stanza/src/client/error.rs | 38 | ||||
-rw-r--r-- | stanza/src/xep_0030/info.rs | 6 |
9 files changed, 232 insertions, 34 deletions
diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml index aed261d..ef11192 100644 --- a/filamento/Cargo.toml +++ b/filamento/Cargo.toml @@ -8,7 +8,7 @@ futures = "0.3.31" lampada = { version = "0.1.0", path = "../lampada" } tokio = "1.42.0" thiserror = "2.0.11" -stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203"] } +stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030"] } sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } # TODO: re-export jid? jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } diff --git a/filamento/src/error.rs b/filamento/src/error.rs index c5fdb03..206d6be 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -103,8 +103,12 @@ impl From<sqlx::Error> for DatabaseOpenError { #[derive(Debug, Error, Clone)] // TODO: should probably have all iq query related errors here, including read, write, stanza error, etc. pub enum IqError { + #[error("writing response: {0}")] + WriteError(#[from] WriteError), #[error("no iq with id matching `{0}`")] NoMatchingId(String), + #[error("incorrect addressee: {0}")] + IncorrectAddressee(jid::JID), } #[derive(Debug, Error, Clone)] diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 365a0df..dc262a9 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -68,7 +68,6 @@ impl ClientLogic { pub async fn handle_error(&self, e: Error) { error!("{}", e); - self.handle_update(UpdateMessage::Error(e)).await; } } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 1a68936..3dfe9fb 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -2,7 +2,15 @@ use std::str::FromStr; use chrono::Utc; use lampada::{Connected, SupervisorSender}; -use stanza::client::{Stanza, iq::Iq}; +use stanza::{ + client::{ + Stanza, + iq::{self, Iq, IqType}, + }, + stanza_error::Error as StanzaError, + xep_0030, +}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use crate::{ @@ -19,7 +27,6 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne let result = process_stanza(logic.clone(), stanza, connection).await; match result { Ok(u) => match u { - Some(UpdateMessage::Unsupported(stanza)) => logic.handle_unsupported(stanza).await, _ => { if let Some(u) = u { logic.handle_update(u).await @@ -149,67 +156,192 @@ pub async fn recv_presence( } } -pub async fn recv_iq(logic: ClientLogic, iq: Iq) -> Result<Option<UpdateMessage>, IqError> { +pub async fn recv_iq( + logic: ClientLogic, + connection: Connected, + iq: Iq, +) -> Result<Option<UpdateMessage>, IqError> { + if let Some(to) = &iq.to { + if *to == *connection.jid() { + } else { + return Err(IqError::IncorrectAddressee(to.clone())); + } + } match iq.r#type { stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { let send; { send = logic.pending().lock().await.remove(&iq.id); } + let from = iq + .from + .clone() + .unwrap_or_else(|| connection.server().clone()); if let Some(send) = send { - send.send(Ok(Stanza::Iq(iq))); + debug!("received iq result from {}", from); + let _ = send.send(Ok(Stanza::Iq(iq))); Ok(None) } else { Err(IqError::NoMatchingId(iq.id)) } } - // TODO: send unsupported to server - // TODO: proper errors i am so tired please - stanza::client::iq::IqType::Get => Ok(None), + stanza::client::iq::IqType::Get => { + let from = iq + .from + .clone() + .unwrap_or_else(|| connection.server().clone()); + if let Some(query) = iq.query { + match query { + stanza::client::iq::Query::DiscoInfo(_query) => { + // TODO: should this only be replied to server? + info!("received disco#info request from {}", from); + let disco = xep_0030::info::Query { + node: None, + features: vec![xep_0030::info::Feature { + var: "http://jabber.org/protocol/disco#info".to_string(), + }], + identities: vec![xep_0030::info::Identity { + category: "client".to_string(), + name: Some("filamento".to_string()), + r#type: "pc".to_string(), + }], + }; + let iq = Iq { + from: Some(connection.jid().clone()), + id: iq.id, + to: iq.from, + r#type: IqType::Result, + lang: None, + query: Some(iq::Query::DiscoInfo(disco)), + errors: vec![], + }; + connection.write_handle().write(Stanza::Iq(iq)).await?; + info!("replied to disco#info request from {}", from); + Ok(None) + } + _ => { + warn!("received unsupported iq get from {}", from); + let iq = Iq { + from: Some(connection.jid().clone()), + id: iq.id, + to: iq.from, + r#type: IqType::Error, + lang: None, + query: None, + errors: vec![StanzaError::ServiceUnavailable.into()], + }; + connection.write_handle().write(Stanza::Iq(iq)).await?; + warn!("replied to unsupported iq get from {}", from); + Ok(None) + } // stanza::client::iq::Query::Bind(bind) => todo!(), + // stanza::client::iq::Query::DiscoItems(query) => todo!(), + // stanza::client::iq::Query::Ping(ping) => todo!(), + // stanza::client::iq::Query::Roster(query) => todo!(), + // stanza::client::iq::Query::Unsupported => todo!(), + } + } else { + info!("received malformed iq query from {}", from); + let iq = Iq { + from: Some(connection.jid().clone()), + id: iq.id, + to: iq.from, + r#type: IqType::Error, + lang: None, + query: None, + errors: vec![StanzaError::BadRequest.into()], + }; + connection.write_handle().write(Stanza::Iq(iq)).await?; + info!("replied to malformed iq query from {}", from); + Ok(None) + } + } stanza::client::iq::IqType::Set => { + let from = iq + .from + .clone() + .unwrap_or_else(|| connection.server().clone()); if let Some(query) = iq.query { match query { stanza::client::iq::Query::Roster(mut query) => { - // TODO: there should only be one + // TODO: should only have one, otherwise send error + // if let Some(item) = query.items.pop() && query.items.len() == 1 { if let Some(item) = query.items.pop() { match item.subscription { Some(stanza::roster::Subscription::Remove) => { - logic.db().delete_contact(item.jid.clone()).await; + if let Err(e) = + logic.db().delete_contact(item.jid.clone()).await + { + error!("{}", RosterError::Cache(e.into())); + } Ok(Some(UpdateMessage::RosterDelete(item.jid))) } _ => { let contact: Contact = item.into(); if let Err(e) = logic.db().upsert_contact(contact.clone()).await { - let _ = logic - .update_sender() - .send(UpdateMessage::Error(Error::Roster( - RosterError::Cache(e.into()), - ))) - .await; + error!("{}", RosterError::Cache(e.into())); + } + let iq = Iq { + from: Some(connection.jid().clone()), + id: iq.id, + to: iq.from, + r#type: IqType::Result, + lang: None, + query: None, + errors: vec![], + }; + if let Err(e) = + connection.write_handle().write(Stanza::Iq(iq)).await + { + error!("could not reply to roster set: {}", e); } Ok(Some(UpdateMessage::RosterUpdate(contact))) - // TODO: send result - // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { - // from: , - // id: todo!(), - // to: todo!(), - // r#type: todo!(), - // lang: todo!(), - // query: todo!(), - // errors: todo!(), - // })); } } } else { + warn!("received malformed roster push"); + let iq = Iq { + from: Some(connection.jid().clone()), + id: iq.id, + to: iq.from, + r#type: IqType::Error, + lang: None, + query: None, + errors: vec![StanzaError::NotAcceptable.into()], + }; + connection.write_handle().write(Stanza::Iq(iq)).await?; Ok(None) } } // TODO: send unsupported to server - _ => Ok(None), + _ => { + warn!("received unsupported iq set from {}", from); + let iq = Iq { + from: Some(connection.jid().clone()), + id: iq.id, + to: iq.from, + r#type: IqType::Error, + lang: None, + query: None, + errors: vec![StanzaError::ServiceUnavailable.into()], + }; + connection.write_handle().write(Stanza::Iq(iq)).await?; + warn!("replied to unsupported iq set from {}", from); + Ok(None) + } } } else { - // TODO: send error (unsupported) to server + warn!("received malformed iq set from {}", from); + let iq = Iq { + from: Some(connection.jid().clone()), + id: iq.id, + to: iq.from, + r#type: IqType::Error, + lang: None, + query: None, + errors: vec![StanzaError::NotAcceptable.into()], + }; + connection.write_handle().write(Stanza::Iq(iq)).await?; Ok(None) } } @@ -224,12 +356,13 @@ pub async fn process_stanza( let update = match stanza { Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?), Stanza::Presence(presence) => Ok(recv_presence(presence).await?), - Stanza::Iq(iq) => Ok(recv_iq(logic, iq).await?), + Stanza::Iq(iq) => Ok(recv_iq(logic, connection.clone(), iq).await?), // unreachable, always caught by lampada - // TODO: make cleaner + // TODO: make cleaner than this in some way Stanza::Error(error) => { unreachable!() } + // should this cause a stream restart? Stanza::OtherContent(content) => { Err(Error::UnrecognizedContent) // TODO: send error to write_thread diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index ffaa7a7..2d570ae 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -298,6 +298,7 @@ impl SupervisorHandle { streams: BoundJabberStream<Tls>, on_crash: oneshot::Sender<()>, jid: JID, + server: JID, password: Arc<String>, logic: Lgc, ) -> (WriteHandle, Self) { @@ -313,6 +314,7 @@ impl SupervisorHandle { let connected = Connected { jid, write_handle: write_handle.clone(), + server, }; let supervisor_sender = SupervisorSender { diff --git a/lampada/src/error.rs b/lampada/src/error.rs index cdfb4db..384d1ee 100644 --- a/lampada/src/error.rs +++ b/lampada/src/error.rs @@ -20,6 +20,8 @@ pub enum ConnectionError { // TODO: Display for Content #[error("disconnected")] Disconnected, + #[error("invalid server jid: {0}")] + InvalidServerJID(#[from] jid::ParseError), } #[derive(Debug, Error, Clone)] diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs index a01ba06..7346c42 100644 --- a/lampada/src/lib.rs +++ b/lampada/src/lib.rs @@ -34,6 +34,8 @@ pub struct Connected { // full jid will stay stable across reconnections jid: JID, write_handle: WriteHandle, + // the server jid + server: JID, } impl Connected { @@ -44,6 +46,10 @@ impl Connected { pub fn write_handle(&self) -> &WriteHandle { &self.write_handle } + + pub fn server(&self) -> &JID { + &self.server + } } /// everything that a particular xmpp client must implement @@ -166,6 +172,18 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { let streams_result = luz::connect_and_login(&mut jid, &*self.password, &mut domain) .await; + let server: JID = match domain.parse() { + Ok(j) => j, + Err(e) => { + self.logic + .clone() + .handle_connection_error(ConnectionError::InvalidServerJID( + e, + )) + .await; + continue; + } + }; match streams_result { Ok(s) => { debug!("ok stream result"); @@ -174,6 +192,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { s, shutdown_send, jid.clone(), + server.clone(), self.password.clone(), self.logic.clone(), ); @@ -184,6 +203,7 @@ impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> { let connected = Connected { jid, write_handle: writer, + server, }; self.logic.clone().handle_connect(connected.clone()).await; diff --git a/stanza/src/client/error.rs b/stanza/src/client/error.rs index aa142bf..33bc85e 100644 --- a/stanza/src/client/error.rs +++ b/stanza/src/client/error.rs @@ -99,3 +99,41 @@ impl FromStr for ErrorType { } } } + +impl From<StanzaError> for Error { + fn from(value: StanzaError) -> Self { + let error_type = match value { + StanzaError::BadRequest => ErrorType::Modify, + StanzaError::Conflict => ErrorType::Cancel, + // cancel or modify + StanzaError::FeatureNotImplemented => ErrorType::Cancel, + StanzaError::Forbidden => ErrorType::Auth, + StanzaError::Gone(_) => ErrorType::Cancel, + StanzaError::InternalServerError => ErrorType::Cancel, + StanzaError::ItemNotFound => ErrorType::Cancel, + StanzaError::JIDMalformed => ErrorType::Modify, + StanzaError::NotAcceptable => ErrorType::Modify, + StanzaError::NotAllowed => ErrorType::Cancel, + StanzaError::NotAuthorized => ErrorType::Auth, + // modify or wait + StanzaError::PolicyViolation => ErrorType::Modify, + StanzaError::RecipientUnavailable => ErrorType::Wait, + StanzaError::Redirect(_) => ErrorType::Modify, + StanzaError::RegistrationRequired => ErrorType::Auth, + StanzaError::RemoteServerNotFound => ErrorType::Cancel, + StanzaError::RemoteServerTimeout => ErrorType::Wait, + StanzaError::ResourceConstraint => ErrorType::Wait, + StanzaError::ServiceUnavailable => ErrorType::Cancel, + StanzaError::SubscriptionRequired => ErrorType::Auth, + StanzaError::UndefinedCondition => ErrorType::Cancel, + // wait or modify + StanzaError::UnexpectedRequest => ErrorType::Modify, + }; + Self { + by: None, + r#type: error_type, + error: value, + text: None, + } + } +} diff --git a/stanza/src/xep_0030/info.rs b/stanza/src/xep_0030/info.rs index cec2dcb..589fd08 100644 --- a/stanza/src/xep_0030/info.rs +++ b/stanza/src/xep_0030/info.rs @@ -7,9 +7,9 @@ pub const XMLNS: &str = "http://jabber.org/protocol/disco#info"; #[derive(Debug, Clone)] pub struct Query { - node: Option<String>, - features: Vec<Feature>, - identities: Vec<Identity>, + pub node: Option<String>, + pub features: Vec<Feature>, + pub identities: Vec<Identity>, } impl FromElement for Query { |