aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/online.rs
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-04-03 03:41:38 +0100
committerLibravatar cel 🌸 <cel@bunny.garden>2025-04-03 03:41:38 +0100
commit91f1994af940085d5d475a97820900ebbf0eb553 (patch)
tree6aab872f71d17a785d3d9286742fef38983d274c /filamento/src/logic/online.rs
parent9ce3827a7d25714d17f266f0f50bb29f41090175 (diff)
downloadluz-91f1994af940085d5d475a97820900ebbf0eb553.tar.gz
luz-91f1994af940085d5d475a97820900ebbf0eb553.tar.bz2
luz-91f1994af940085d5d475a97820900ebbf0eb553.zip
feat: better message handling, pep publish, xep_0172: nick
Diffstat (limited to 'filamento/src/logic/online.rs')
-rw-r--r--filamento/src/logic/online.rs355
1 files changed, 219 insertions, 136 deletions
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index 63a4aa3..d32f527 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -3,10 +3,11 @@ use jid::JID;
use lampada::{Connected, WriteMessage, error::WriteError};
use stanza::{
client::{
- Stanza,
- iq::{self, Iq, IqType, Query},
+ iq::{self, Iq, IqType, Query}, Stanza
},
xep_0030::{info, items},
+ xep_0060::pubsub::{self, Pubsub},
+ xep_0172::{self, Nick},
xep_0203::Delay,
};
use tokio::sync::oneshot;
@@ -14,12 +15,9 @@ use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
- Command, UpdateMessage,
- chat::{Body, Message},
- disco::{Info, Items},
- error::{DatabaseError, DiscoError, Error, MessageSendError, RosterError, StatusError},
- presence::{Online, Presence, PresenceType},
- roster::{Contact, ContactUpdate},
+ chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
+ DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError
+ }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
};
use super::{
@@ -156,105 +154,82 @@ pub async fn handle_add_contact(
}
}
-pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> {
+pub async fn handle_buddy_request(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+) -> Result<(), SubscribeError> {
+ let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ nick,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_subscription_request(
+ logic: &ClientLogic,
connection: Connected,
jid: JID,
-) -> Result<(), WriteError> {
+) -> Result<(), SubscribeError> {
// TODO: i should probably have builders
+ let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ nick,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_accept_buddy_request(
+ logic: &ClientLogic,
connection: Connected,
jid: JID,
-) -> Result<(), WriteError> {
+) -> Result<(), SubscribeError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
+ let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ nick,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_accept_subscription_request(
+ logic: &ClientLogic,
connection: Connected,
jid: JID,
-) -> Result<(), WriteError> {
+) -> Result<(), SubscribeError> {
+ let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ nick,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
@@ -265,16 +240,9 @@ pub async fn handle_unsubscribe_from_contact(
jid: JID,
) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
@@ -282,16 +250,9 @@ pub async fn handle_unsubscribe_from_contact(
pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
@@ -299,29 +260,15 @@ pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Resu
pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
@@ -464,60 +411,119 @@ pub async fn handle_set_status(
Ok(())
}
-pub async fn handle_send_message(
- logic: &ClientLogic,
- connection: Connected,
- jid: JID,
- body: Body,
-) -> Result<(), WriteError> {
+pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) {
+ // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default.
+ let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
+
+ let nick;
+ let mark_chat_as_chatted;
+ if have_chatted == false {
+ match logic.db.read_user(logic.bare_jid.clone()).await {
+ Ok(u) => {
+ nick = u.nick.map(|nick| Nick(nick));
+ mark_chat_as_chatted = true;
+ }
+ Err(e) => {
+ logic
+ .handle_error(MessageSendError::GetUserDetails(e.into()).into())
+ .await;
+ nick = None;
+ mark_chat_as_chatted = false;
+ }
+ }
+ } else {
+ nick = None;
+ mark_chat_as_chatted = false;
+ }
+
+ // generate message struct
let id = Uuid::new_v4();
let timestamp = Utc::now();
- let message = Stanza::Message(stanza::client::message::Message {
+ let message = Message {
+ id,
+ from: connection.jid().as_bare(),
+ body: body.clone(),
+ timestamp,
+ delivery: Some(Delivery::Sending),
+ };
+
+ // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send
+ // TODO: mark these as potentially failed upon client launch
+ if let Err(e) = logic
+ .db()
+ .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone())
+ .await
+ {
+ // TODO: should these really be handle_error or just the error macro?
+ logic
+ .handle_error(MessageSendError::MessageHistory(e.into()).into())
+ .await;
+ }
+
+ // tell the client a message is being sent
+ logic
+ .update_sender()
+ .send(UpdateMessage::Message {
+ to: jid.as_bare(),
+ message,
+ })
+ .await;
+
+ // prepare the message stanza
+ let message_stanza = Stanza::Message(stanza::client::message::Message {
from: Some(connection.jid().clone()),
id: Some(id.to_string()),
to: Some(jid.clone()),
// TODO: specify message type
r#type: stanza::client::message::MessageType::Chat,
// TODO: lang ?
- lang: None,
- subject: None,
body: Some(stanza::client::message::Body {
lang: None,
body: Some(body.body.clone()),
}),
- thread: None,
// include delay to have a consistent timestamp between server and client
delay: Some(Delay {
from: None,
stamp: timestamp,
}),
+ nick,
+ ..Default::default()
});
- connection.write_handle().write(message).await?;
- let mut message = Message {
- id,
- from: connection.jid().clone(),
- body,
- timestamp,
- };
- info!("sent message: {:?}", message);
- if let Err(e) = logic
- .db()
- .create_message_with_self_resource_and_chat(message.clone(), jid.clone())
- .await
- {
- // TODO: should these really be handle_error or just the error macro?
- logic
- .handle_error(MessageSendError::MessageHistory(e.into()).into())
- .await;
- }
- // TODO: don't do this, have separate from from details
- message.from = message.from.as_bare();
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Message { to: jid, message })
+
+ // send the message
+ let result = connection
+ .write_handle()
+ .write(message_stanza.clone())
.await;
- Ok(())
- // TODO: refactor this to send a sending updatemessage, then update or something like that
+ match result {
+ Ok(_) => {
+ info!("sent message: {:?}", message_stanza);
+ logic
+ .update_sender()
+ .send(UpdateMessage::MessageDelivery {
+ id,
+ delivery: Delivery::Written,
+ })
+ .await;
+ if mark_chat_as_chatted {
+ if let Err(e) = logic.db.mark_chat_as_chatted(jid).await {
+ logic
+ .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into())
+ .await;
+ }
+ }
+ }
+ Err(e) => {
+ logic
+ .update_sender()
+ .send(UpdateMessage::MessageDelivery {
+ id,
+ delivery: Delivery::Failed,
+ })
+ .await;
+ logic.handle_error(MessageSendError::Write(e).into()).await;
+ }
+ }
}
pub async fn handle_send_presence(
@@ -673,6 +679,78 @@ pub async fn handle_disco_items(
}
}
+pub async fn handle_publish(
+ logic: &ClientLogic,
+ connection: Connected,
+ item: pep::Item,
+ node: String,
+) -> Result<(), PublishError> {
+ let id = Uuid::new_v4().to_string();
+ let publish = match item {
+ pep::Item::Nick(n) => pubsub::Publish {
+ node,
+ items: vec![pubsub::Item {
+ item: Some(pubsub::Content::Nick(Nick(n))),
+ ..Default::default()
+ }],
+ },
+ };
+ let request = Iq {
+ from: Some(connection.jid().clone()),
+ id: id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(Query::Pubsub(Pubsub::Publish(publish, None))),
+ errors: Vec::new(),
+ };
+ match logic
+ .pending()
+ .request(&connection, Stanza::Iq(request), id)
+ .await? {
+
+ Stanza::Iq(Iq {
+ from,
+ r#type,
+ query,
+ errors,
+ ..
+ // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
+ }) if r#type == IqType::Result || r#type == IqType::Error => {
+ if from == None ||
+ from == Some(connection.jid().as_bare())
+ {
+ match r#type {
+ IqType::Result => {
+ if let Some(query) = query {
+ match query {
+ Query::Pubsub(_) => Ok(()),
+ q => Err(PublishError::MismatchedQuery(q)),
+ }
+ } else {
+ Err(PublishError::MissingQuery)
+ }
+ }
+ IqType::Error => {
+ Err(PublishError::StanzaErrors(errors))
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ Err(PublishError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().as_bare()),
+ ))
+ }
+ }
+ s => Err(PublishError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> {
+ logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?;
+ Ok(())
+}
+
// TODO: could probably macro-ise?
pub async fn handle_online_result(
logic: &ClientLogic,
@@ -716,25 +794,24 @@ pub async fn handle_online_result(
let user = handle_get_user(logic, jid).await;
let _ = sender.send(user);
}
- // TODO: offline queue to modify roster
Command::AddContact(jid, sender) => {
let result = handle_add_contact(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::BuddyRequest(jid, sender) => {
- let result = handle_buddy_request(connection, jid).await;
+ let result = handle_buddy_request(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::SubscriptionRequest(jid, sender) => {
- let result = handle_subscription_request(connection, jid).await;
+ let result = handle_subscription_request(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::AcceptBuddyRequest(jid, sender) => {
- let result = handle_accept_buddy_request(connection, jid).await;
+ let result = handle_accept_buddy_request(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::AcceptSubscriptionRequest(jid, sender) => {
- let result = handle_accept_subscription_request(connection, jid).await;
+ let result = handle_accept_subscription_request(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::UnsubscribeFromContact(jid, sender) => {
@@ -761,10 +838,8 @@ pub async fn handle_online_result(
let result = handle_set_status(logic, connection, online).await;
let _ = sender.send(result);
}
- // TODO: offline message queue
- Command::SendMessage(jid, body, sender) => {
- let result = handle_send_message(logic, connection, jid, body).await;
- let _ = sender.send(result);
+ Command::SendMessage(jid, body) => {
+ handle_send_message(logic, connection, jid, body).await;
}
Command::SendPresence(jid, presence, sender) => {
let result = handle_send_presence(connection, jid, presence).await;
@@ -778,6 +853,14 @@ pub async fn handle_online_result(
let result = handle_disco_items(logic, connection, jid, node).await;
let _ = sender.send(result);
}
+ Command::Publish { item, node, sender } => {
+ let result = handle_publish(logic, connection, item, node).await;
+ let _ = sender.send(result);
+ }
+ Command::ChangeNick(nick, sender) => {
+ let result = handle_change_nick(logic, nick).await;
+ let _ = sender.send(result);
+ }
}
Ok(())
}