aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/process_stanza.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/logic/process_stanza.rs')
-rw-r--r--filamento/src/logic/process_stanza.rs421
1 files changed, 198 insertions, 223 deletions
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 17738df..1a68936 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -2,263 +2,238 @@ use std::str::FromStr;
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
-use stanza::client::Stanza;
+use stanza::client::{Stanza, iq::Iq};
use uuid::Uuid;
use crate::{
UpdateMessage,
chat::{Body, Message},
- error::{Error, IqError, MessageRecvError, PresenceError, RosterError},
+ error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError},
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
};
use super::ClientLogic;
-pub async fn handle_stanza(
+pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) {
+ let result = process_stanza(logic.clone(), stanza, connection).await;
+ match result {
+ Ok(u) => match u {
+ Some(UpdateMessage::Unsupported(stanza)) => logic.handle_unsupported(stanza).await,
+ _ => {
+ if let Some(u) = u {
+ logic.handle_update(u).await
+ }
+ }
+ },
+ Err(e) => logic.handle_error(e).await,
+ }
+}
+
+pub async fn recv_message(
logic: ClientLogic,
- stanza: Stanza,
- connection: Connected,
- supervisor: SupervisorSender,
-) {
- match stanza {
- Stanza::Message(stanza_message) => {
- if let Some(mut from) = stanza_message.from {
- // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
- let timestamp = stanza_message
+ stanza_message: stanza::client::message::Message,
+) -> Result<Option<UpdateMessage>, MessageRecvError> {
+ if let Some(mut from) = stanza_message.from {
+ // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
+ let timestamp = stanza_message
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ // TODO: group chat messages
+ let mut message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ timestamp,
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ logic
+ .db()
+ .create_message_with_user_resource_and_chat(message.clone(), from.clone())
+ .await
+ .map_err(|e| DatabaseError(e.into()))?;
+ message.from = message.from.as_bare();
+ from = from.as_bare();
+ Ok(Some(UpdateMessage::Message { to: from, message }))
+ } else {
+ Err(MessageRecvError::MissingFrom)
+ }
+}
+
+pub async fn recv_presence(
+ presence: stanza::client::presence::Presence,
+) -> Result<Option<UpdateMessage>, PresenceError> {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
+ Err(PresenceError::StanzaError(
+ presence
+ .errors
+ .first()
+ .cloned()
+ .expect("error MUST have error"),
+ ))
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ Err(PresenceError::Unsupported)
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ Ok(Some(UpdateMessage::SubscriptionRequest(from)))
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ Ok(Some(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Offline(offline),
+ },
+ }))
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => Ok(None),
+ stanza::client::presence::PresenceType::Unsubscribe => Ok(None),
+ stanza::client::presence::PresenceType::Unsubscribed => Ok(None),
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let timestamp = presence
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
- // TODO: group chat messages
- let mut message = Message {
- id: stanza_message
- .id
- // TODO: proper id storage
- .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
- .unwrap_or_else(|| Uuid::new_v4()),
- from: from.clone(),
- timestamp,
- body: Body {
- // TODO: should this be an option?
- body: stanza_message
- .body
- .map(|body| body.body)
- .unwrap_or_default()
- .unwrap_or_default(),
+ Ok(Some(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Online(online),
},
- };
- // TODO: can this be more efficient?
- let result = logic
- .db()
- .create_message_with_user_resource_and_chat(message.clone(), from.clone())
- .await;
- if let Err(e) = result {
- tracing::error!("messagecreate");
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MessageHistory(e.into()),
- )))
- .await;
- }
- message.from = message.from.as_bare();
- from = from.as_bare();
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Message { to: from, message })
- .await;
- } else {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MissingFrom,
- )))
- .await;
+ }))
}
}
- Stanza::Presence(presence) => {
- if let Some(from) = presence.from {
- match presence.r#type {
- Some(r#type) => match r#type {
- // error processing a presence from somebody
- stanza::client::presence::PresenceType::Error => {
- // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Presence(
- // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
- PresenceError::StanzaError(
- presence
- .errors
- .first()
- .cloned()
- .expect("error MUST have error"),
- ),
- )))
- .await;
- }
- // should not happen (error to server)
- stanza::client::presence::PresenceType::Probe => {
- // TODO: should probably write an error and restart stream
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::Unsupported,
- )))
- .await;
- }
- stanza::client::presence::PresenceType::Subscribe => {
- // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
- let _ = logic
- .update_sender()
- .send(UpdateMessage::SubscriptionRequest(from))
- .await;
- }
- stanza::client::presence::PresenceType::Unavailable => {
- let offline = Offline {
- status: presence.status.map(|status| status.status.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Offline(offline),
- },
- })
- .await;
- }
- // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
- stanza::client::presence::PresenceType::Subscribed => {}
- stanza::client::presence::PresenceType::Unsubscribe => {}
- stanza::client::presence::PresenceType::Unsubscribed => {}
- },
- None => {
- let online = Online {
- show: presence.show.map(|show| match show {
- stanza::client::presence::Show::Away => Show::Away,
- stanza::client::presence::Show::Chat => Show::Chat,
- stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
- stanza::client::presence::Show::Xa => Show::ExtendedAway,
- }),
- status: presence.status.map(|status| status.status.0),
- priority: presence.priority.map(|priority| priority.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Online(online),
- },
- })
- .await;
- }
- }
+ } else {
+ Err(PresenceError::MissingFrom)
+ }
+}
+
+pub async fn recv_iq(logic: ClientLogic, iq: Iq) -> Result<Option<UpdateMessage>, IqError> {
+ match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = logic.pending().lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ Ok(None)
} else {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::MissingFrom,
- )))
- .await;
+ Err(IqError::NoMatchingId(iq.id))
}
}
- Stanza::Iq(iq) => match iq.r#type {
- stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
- let send;
- {
- send = logic.pending().lock().await.remove(&iq.id);
- }
- if let Some(send) = send {
- send.send(Ok(Stanza::Iq(iq)));
- } else {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
- iq.id,
- ))))
- .await;
- }
- }
- // TODO: send unsupported to server
- // TODO: proper errors i am so tired please
- stanza::client::iq::IqType::Get => {}
- stanza::client::iq::IqType::Set => {
- if let Some(query) = iq.query {
- match query {
- stanza::client::iq::Query::Roster(mut query) => {
- // TODO: there should only be one
- if let Some(item) = query.items.pop() {
- match item.subscription {
- Some(stanza::roster::Subscription::Remove) => {
- logic.db().delete_contact(item.jid.clone()).await;
- logic
- .update_sender()
- .send(UpdateMessage::RosterDelete(item.jid))
- .await;
- // TODO: send result
- }
- _ => {
- let contact: Contact = item.into();
- if let Err(e) =
- logic.db().upsert_contact(contact.clone()).await
- {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Roster(
- RosterError::Cache(e.into()),
- )))
- .await;
- }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => Ok(None),
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ logic.db().delete_contact(item.jid.clone()).await;
+ Ok(Some(UpdateMessage::RosterDelete(item.jid)))
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) = logic.db().upsert_contact(contact.clone()).await
+ {
let _ = logic
.update_sender()
- .send(UpdateMessage::RosterUpdate(contact))
+ .send(UpdateMessage::Error(Error::Roster(
+ RosterError::Cache(e.into()),
+ )))
.await;
- // TODO: send result
- // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
- // from: ,
- // id: todo!(),
- // to: todo!(),
- // r#type: todo!(),
- // lang: todo!(),
- // query: todo!(),
- // errors: todo!(),
- // }));
}
+ Ok(Some(UpdateMessage::RosterUpdate(contact)))
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
}
}
+ } else {
+ Ok(None)
}
- // TODO: send unsupported to server
- _ => {}
}
- } else {
- // TODO: send error (unsupported) to server
+ // TODO: send unsupported to server
+ _ => Ok(None),
}
+ } else {
+ // TODO: send error (unsupported) to server
+ Ok(None)
}
- },
+ }
+ }
+}
+
+pub async fn process_stanza(
+ logic: ClientLogic,
+ stanza: Stanza,
+ connection: Connected,
+) -> Result<Option<UpdateMessage>, Error> {
+ let update = match stanza {
+ Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
+ Stanza::Presence(presence) => Ok(recv_presence(presence).await?),
+ Stanza::Iq(iq) => Ok(recv_iq(logic, iq).await?),
+ // unreachable, always caught by lampada
+ // TODO: make cleaner
Stanza::Error(error) => {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Stream(error)))
- .await;
- // TODO: reconnect
+ unreachable!()
}
Stanza::OtherContent(content) => {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::UnrecognizedContent));
+ Err(Error::UnrecognizedContent)
// TODO: send error to write_thread
}
- }
+ };
+ update
}