aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/logic')
-rw-r--r--filamento/src/logic/connect.rs2
-rw-r--r--filamento/src/logic/local_only.rs16
-rw-r--r--filamento/src/logic/offline.rs37
-rw-r--r--filamento/src/logic/online.rs101
-rw-r--r--filamento/src/logic/process_stanza.rs83
5 files changed, 217 insertions, 22 deletions
diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs
index 37cdad5..9d61ca4 100644
--- a/filamento/src/logic/connect.rs
+++ b/filamento/src/logic/connect.rs
@@ -19,7 +19,7 @@ pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>(
debug!("getting roster");
logic
.clone()
- .handle_online(Command::GetRoster(send), connection.clone())
+ .handle_online(Command::GetRosterWithUsers(send), connection.clone())
.await;
debug!("sent roster req");
let roster = recv.await;
diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs
index cabbef4..dc94d2c 100644
--- a/filamento/src/logic/local_only.rs
+++ b/filamento/src/logic/local_only.rs
@@ -28,6 +28,15 @@ pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone
Ok(logic.db().read_chats_ordered_with_latest_messages().await?)
}
+pub async fn handle_get_chats_ordered_with_latest_messages_and_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<((Chat, User), (Message, User))>, DatabaseError> {
+ Ok(logic
+ .db()
+ .read_chats_ordered_with_latest_messages_and_users()
+ .await?)
+}
+
pub async fn handle_get_chat<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
jid: JID,
@@ -42,6 +51,13 @@ pub async fn handle_get_messages<Fs: FileStore + Clone>(
Ok(logic.db().read_message_history(jid).await?)
}
+pub async fn handle_get_messages_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<Vec<(Message, User)>, DatabaseError> {
+ Ok(logic.db().read_message_history_with_users(jid).await?)
+}
+
pub async fn handle_delete_chat<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
jid: JID,
diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs
index 566972c..b87484c 100644
--- a/filamento/src/logic/offline.rs
+++ b/filamento/src/logic/offline.rs
@@ -2,6 +2,7 @@ use std::process::id;
use chrono::Utc;
use lampada::error::WriteError;
+use tracing::error;
use uuid::Uuid;
use crate::{
@@ -14,6 +15,7 @@ use crate::{
files::FileStore,
presence::Online,
roster::Contact,
+ user::User,
};
use super::{
@@ -21,7 +23,8 @@ use super::{
local_only::{
handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
+ handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages,
+ handle_get_messages_with_users, handle_get_user,
},
};
@@ -47,6 +50,12 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>(
Ok(logic.db().read_cached_roster().await?)
}
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ Ok(logic.db().read_cached_roster_with_users().await?)
+}
+
pub async fn handle_offline_result<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
command: Command<Fs>,
@@ -56,6 +65,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let roster = handle_get_roster(logic).await;
sender.send(roster);
}
+ Command::GetRosterWithUsers(sender) => {
+ let roster = handle_get_roster_with_users(logic).await;
+ sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
sender.send(chats);
@@ -68,6 +81,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chats = handle_get_chat(logic, jid).await;
sender.send(chats);
@@ -76,6 +93,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let messages = handle_get_messages(logic, jid).await;
sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
sender.send(result);
@@ -151,11 +172,25 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
.handle_error(MessageSendError::MessageHistory(e.into()).into())
.await;
}
+
+ let from = match logic.db().read_user(logic.bare_jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.bare_jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ }
+ };
+
logic
.update_sender()
.send(crate::UpdateMessage::Message {
to: jid.as_bare(),
message,
+ from,
})
.await;
}
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index d9441d7..767f923 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -18,16 +18,13 @@ use uuid::Uuid;
use crate::{
avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError
- }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
+ }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage
};
use super::{
- ClientLogic,
local_only::{
- handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
- handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
- },
+ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user
+ }, ClientLogic
};
pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {
@@ -97,6 +94,71 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>(
}
}
+// this can't query the client... otherwise there is a hold-up and the connection can't complete
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ connection: Connected,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, stanza, iq_id.clone())
+ .await?;
+ // TODO: timeout
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .handle_error(Error::Roster(RosterError::Cache(e.into())))
+ .await;
+ };
+ let mut users = Vec::new();
+ for contact in &contacts {
+ let user = logic.db().read_user(contact.user_jid.clone()).await?;
+ users.push(user);
+ }
+ Ok(contacts.into_iter().zip(users).collect())
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
pub async fn handle_add_contact<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
@@ -470,12 +532,25 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
.await;
}
+ let from = match logic.db().read_user(logic.bare_jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.bare_jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ },
+ };
+
// tell the client a message is being sent
logic
.update_sender()
.send(UpdateMessage::Message {
to: jid.as_bare(),
message,
+ from,
})
.await;
@@ -513,6 +588,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Written,
+ chat: jid.clone(),
})
.await;
if mark_chat_as_chatted {
@@ -530,6 +606,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Failed,
+ chat: jid,
})
.await;
logic.handle_error(MessageSendError::Write(e).into()).await;
@@ -1006,6 +1083,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let roster = handle_get_roster(logic, connection).await;
let _ = result_sender.send(roster);
}
+ Command::GetRosterWithUsers(result_sender) => {
+ let roster = handle_get_roster_with_users(logic, connection).await;
+ let _ = result_sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
let _ = sender.send(chats);
@@ -1018,6 +1099,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
let _ = sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chat = handle_get_chat(logic, jid).await;
let _ = sender.send(chat);
@@ -1026,6 +1111,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let messages = handle_get_messages(logic, jid).await;
let _ = sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
let _ = sender.send(result);
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 9c49b04..cdaff97 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -20,12 +20,13 @@ use crate::{
UpdateMessage, caps,
chat::{Body, Message},
error::{
- AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
- RosterError,
+ AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError,
+ PresenceError, RosterError,
},
files::FileStore,
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
+ user::User,
};
use super::ClientLogic;
@@ -103,11 +104,24 @@ pub async fn recv_message<Fs: FileStore + Clone>(
}
};
+ let from_user = match logic.db().read_user(from.as_bare()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: from.as_bare(),
+ nick: None,
+ avatar: None,
+ }
+ }
+ };
+
// update the client with the new message
logic
.update_sender()
.send(UpdateMessage::Message {
to: from.as_bare(),
+ from: from_user,
message,
})
.await;
@@ -541,11 +555,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
logic: ClientLogic<Fs>,
connection: Connected,
iq: Iq,
-) -> Result<Option<UpdateMessage>, IqError> {
+) -> Result<Option<UpdateMessage>, IqProcessError> {
if let Some(to) = &iq.to {
if *to == *connection.jid() {
} else {
- return Err(IqError::IncorrectAddressee(to.clone()));
+ return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone())));
}
}
match iq.r#type {
@@ -556,7 +570,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
.unwrap_or_else(|| connection.server().clone());
let id = iq.id.clone();
debug!("received iq result with id `{}` from {}", id, from);
- logic.pending().respond(Stanza::Iq(iq), id).await?;
+ logic
+ .pending()
+ .respond(Stanza::Iq(iq), id)
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
stanza::client::iq::IqType::Get => {
@@ -596,7 +614,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
@@ -612,7 +634,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
@@ -627,7 +653,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: Some(iq::Query::DiscoInfo(disco)),
errors: vec![],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
Ok(None)
}
@@ -642,7 +672,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq get from {}", from);
Ok(None)
} // stanza::client::iq::Query::Bind(bind) => todo!(),
@@ -662,7 +696,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::BadRequest.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to malformed iq query from {}", from);
Ok(None)
}
@@ -713,7 +751,12 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
.handle_error(RosterError::PushReply(e.into()).into())
.await;
}
- Ok(Some(UpdateMessage::RosterUpdate(contact)))
+ let user = logic
+ .db()
+ .read_user(contact.user_jid.clone())
+ .await
+ .map_err(|e| Into::<RosterError>::into(e))?;
+ Ok(Some(UpdateMessage::RosterUpdate(contact, user)))
}
}
} else {
@@ -727,7 +770,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
@@ -743,7 +790,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq set from {}", from);
Ok(None)
}
@@ -759,7 +810,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}