aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/process_stanza.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/logic/process_stanza.rs')
-rw-r--r--filamento/src/logic/process_stanza.rs191
1 files changed, 162 insertions, 29 deletions
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 1a68936..3dfe9fb 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -2,7 +2,15 @@ use std::str::FromStr;
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
-use stanza::client::{Stanza, iq::Iq};
+use stanza::{
+ client::{
+ Stanza,
+ iq::{self, Iq, IqType},
+ },
+ stanza_error::Error as StanzaError,
+ xep_0030,
+};
+use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::{
@@ -19,7 +27,6 @@ pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Conne
let result = process_stanza(logic.clone(), stanza, connection).await;
match result {
Ok(u) => match u {
- Some(UpdateMessage::Unsupported(stanza)) => logic.handle_unsupported(stanza).await,
_ => {
if let Some(u) = u {
logic.handle_update(u).await
@@ -149,67 +156,192 @@ pub async fn recv_presence(
}
}
-pub async fn recv_iq(logic: ClientLogic, iq: Iq) -> Result<Option<UpdateMessage>, IqError> {
+pub async fn recv_iq(
+ logic: ClientLogic,
+ connection: Connected,
+ iq: Iq,
+) -> Result<Option<UpdateMessage>, IqError> {
+ if let Some(to) = &iq.to {
+ if *to == *connection.jid() {
+ } else {
+ return Err(IqError::IncorrectAddressee(to.clone()));
+ }
+ }
match iq.r#type {
stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
let send;
{
send = logic.pending().lock().await.remove(&iq.id);
}
+ let from = iq
+ .from
+ .clone()
+ .unwrap_or_else(|| connection.server().clone());
if let Some(send) = send {
- send.send(Ok(Stanza::Iq(iq)));
+ debug!("received iq result from {}", from);
+ let _ = send.send(Ok(Stanza::Iq(iq)));
Ok(None)
} else {
Err(IqError::NoMatchingId(iq.id))
}
}
- // TODO: send unsupported to server
- // TODO: proper errors i am so tired please
- stanza::client::iq::IqType::Get => Ok(None),
+ stanza::client::iq::IqType::Get => {
+ let from = iq
+ .from
+ .clone()
+ .unwrap_or_else(|| connection.server().clone());
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::DiscoInfo(_query) => {
+ // TODO: should this only be replied to server?
+ info!("received disco#info request from {}", from);
+ let disco = xep_0030::info::Query {
+ node: None,
+ features: vec![xep_0030::info::Feature {
+ var: "http://jabber.org/protocol/disco#info".to_string(),
+ }],
+ identities: vec![xep_0030::info::Identity {
+ category: "client".to_string(),
+ name: Some("filamento".to_string()),
+ r#type: "pc".to_string(),
+ }],
+ };
+ let iq = Iq {
+ from: Some(connection.jid().clone()),
+ id: iq.id,
+ to: iq.from,
+ r#type: IqType::Result,
+ lang: None,
+ query: Some(iq::Query::DiscoInfo(disco)),
+ errors: vec![],
+ };
+ connection.write_handle().write(Stanza::Iq(iq)).await?;
+ info!("replied to disco#info request from {}", from);
+ Ok(None)
+ }
+ _ => {
+ warn!("received unsupported iq get from {}", from);
+ let iq = Iq {
+ from: Some(connection.jid().clone()),
+ id: iq.id,
+ to: iq.from,
+ r#type: IqType::Error,
+ lang: None,
+ query: None,
+ errors: vec![StanzaError::ServiceUnavailable.into()],
+ };
+ connection.write_handle().write(Stanza::Iq(iq)).await?;
+ warn!("replied to unsupported iq get from {}", from);
+ Ok(None)
+ } // stanza::client::iq::Query::Bind(bind) => todo!(),
+ // stanza::client::iq::Query::DiscoItems(query) => todo!(),
+ // stanza::client::iq::Query::Ping(ping) => todo!(),
+ // stanza::client::iq::Query::Roster(query) => todo!(),
+ // stanza::client::iq::Query::Unsupported => todo!(),
+ }
+ } else {
+ info!("received malformed iq query from {}", from);
+ let iq = Iq {
+ from: Some(connection.jid().clone()),
+ id: iq.id,
+ to: iq.from,
+ r#type: IqType::Error,
+ lang: None,
+ query: None,
+ errors: vec![StanzaError::BadRequest.into()],
+ };
+ connection.write_handle().write(Stanza::Iq(iq)).await?;
+ info!("replied to malformed iq query from {}", from);
+ Ok(None)
+ }
+ }
stanza::client::iq::IqType::Set => {
+ let from = iq
+ .from
+ .clone()
+ .unwrap_or_else(|| connection.server().clone());
if let Some(query) = iq.query {
match query {
stanza::client::iq::Query::Roster(mut query) => {
- // TODO: there should only be one
+ // TODO: should only have one, otherwise send error
+ // if let Some(item) = query.items.pop() && query.items.len() == 1 {
if let Some(item) = query.items.pop() {
match item.subscription {
Some(stanza::roster::Subscription::Remove) => {
- logic.db().delete_contact(item.jid.clone()).await;
+ if let Err(e) =
+ logic.db().delete_contact(item.jid.clone()).await
+ {
+ error!("{}", RosterError::Cache(e.into()));
+ }
Ok(Some(UpdateMessage::RosterDelete(item.jid)))
}
_ => {
let contact: Contact = item.into();
if let Err(e) = logic.db().upsert_contact(contact.clone()).await
{
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Roster(
- RosterError::Cache(e.into()),
- )))
- .await;
+ error!("{}", RosterError::Cache(e.into()));
+ }
+ let iq = Iq {
+ from: Some(connection.jid().clone()),
+ id: iq.id,
+ to: iq.from,
+ r#type: IqType::Result,
+ lang: None,
+ query: None,
+ errors: vec![],
+ };
+ if let Err(e) =
+ connection.write_handle().write(Stanza::Iq(iq)).await
+ {
+ error!("could not reply to roster set: {}", e);
}
Ok(Some(UpdateMessage::RosterUpdate(contact)))
- // TODO: send result
- // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
- // from: ,
- // id: todo!(),
- // to: todo!(),
- // r#type: todo!(),
- // lang: todo!(),
- // query: todo!(),
- // errors: todo!(),
- // }));
}
}
} else {
+ warn!("received malformed roster push");
+ let iq = Iq {
+ from: Some(connection.jid().clone()),
+ id: iq.id,
+ to: iq.from,
+ r#type: IqType::Error,
+ lang: None,
+ query: None,
+ errors: vec![StanzaError::NotAcceptable.into()],
+ };
+ connection.write_handle().write(Stanza::Iq(iq)).await?;
Ok(None)
}
}
// TODO: send unsupported to server
- _ => Ok(None),
+ _ => {
+ warn!("received unsupported iq set from {}", from);
+ let iq = Iq {
+ from: Some(connection.jid().clone()),
+ id: iq.id,
+ to: iq.from,
+ r#type: IqType::Error,
+ lang: None,
+ query: None,
+ errors: vec![StanzaError::ServiceUnavailable.into()],
+ };
+ connection.write_handle().write(Stanza::Iq(iq)).await?;
+ warn!("replied to unsupported iq set from {}", from);
+ Ok(None)
+ }
}
} else {
- // TODO: send error (unsupported) to server
+ warn!("received malformed iq set from {}", from);
+ let iq = Iq {
+ from: Some(connection.jid().clone()),
+ id: iq.id,
+ to: iq.from,
+ r#type: IqType::Error,
+ lang: None,
+ query: None,
+ errors: vec![StanzaError::NotAcceptable.into()],
+ };
+ connection.write_handle().write(Stanza::Iq(iq)).await?;
Ok(None)
}
}
@@ -224,12 +356,13 @@ pub async fn process_stanza(
let update = match stanza {
Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
Stanza::Presence(presence) => Ok(recv_presence(presence).await?),
- Stanza::Iq(iq) => Ok(recv_iq(logic, iq).await?),
+ Stanza::Iq(iq) => Ok(recv_iq(logic, connection.clone(), iq).await?),
// unreachable, always caught by lampada
- // TODO: make cleaner
+ // TODO: make cleaner than this in some way
Stanza::Error(error) => {
unreachable!()
}
+ // should this cause a stream restart?
Stanza::OtherContent(content) => {
Err(Error::UnrecognizedContent)
// TODO: send error to write_thread