aboutsummaryrefslogtreecommitdiffstats
path: root/filamento
diff options
context:
space:
mode:
Diffstat (limited to 'filamento')
-rw-r--r--filamento/src/db.rs262
-rw-r--r--filamento/src/error.rs23
-rw-r--r--filamento/src/files.rs4
-rw-r--r--filamento/src/lib.rs70
-rw-r--r--filamento/src/logic/connect.rs2
-rw-r--r--filamento/src/logic/local_only.rs16
-rw-r--r--filamento/src/logic/offline.rs37
-rw-r--r--filamento/src/logic/online.rs101
-rw-r--r--filamento/src/logic/process_stanza.rs83
-rw-r--r--filamento/src/user.rs2
10 files changed, 553 insertions, 47 deletions
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
index 1d3d36c..d9206cc 100644
--- a/filamento/src/db.rs
+++ b/filamento/src/db.rs
@@ -1,12 +1,12 @@
use std::{collections::HashSet, path::Path};
-use chrono::Utc;
+use chrono::{DateTime, Utc};
use jid::JID;
use sqlx::{SqlitePool, migrate};
use uuid::Uuid;
use crate::{
- chat::{Chat, Message},
+ chat::{Body, Chat, Delivery, Message},
error::{DatabaseError as Error, DatabaseOpenError},
presence::Online,
roster::Contact,
@@ -51,10 +51,9 @@ impl Db {
pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
- "insert into users ( jid, nick, cached_status_message ) values ( ?, ?, ? )",
+ "insert into users ( jid, nick ) values ( ?, ? )",
user.jid,
user.nick,
- user.cached_status_message
)
.execute(&self.db)
.await?;
@@ -184,8 +183,7 @@ impl Db {
pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
- "update users set cached_status_message = ?, nick = ? where jid = ?",
- user.cached_status_message,
+ "update users set nick = ? where jid = ?",
user.nick,
user.jid
)
@@ -361,10 +359,9 @@ impl Db {
}
pub(crate) async fn read_cached_roster(&self) -> Result<Vec<Contact>, Error> {
- let mut roster: Vec<Contact> =
- sqlx::query_as("select * from roster join users on jid = user_jid")
- .fetch_all(&self.db)
- .await?;
+ let mut roster: Vec<Contact> = sqlx::query_as("select * from roster")
+ .fetch_all(&self.db)
+ .await?;
for contact in &mut roster {
#[derive(sqlx::FromRow)]
struct Row {
@@ -380,6 +377,39 @@ impl Db {
Ok(roster)
}
+ pub(crate) async fn read_cached_roster_with_users(
+ &self,
+ ) -> Result<Vec<(Contact, User)>, Error> {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ #[sqlx(flatten)]
+ contact: Contact,
+ #[sqlx(flatten)]
+ user: User,
+ }
+ let mut roster: Vec<Row> =
+ sqlx::query_as("select * from roster join users on jid = user_jid")
+ .fetch_all(&self.db)
+ .await?;
+ for row in &mut roster {
+ #[derive(sqlx::FromRow)]
+ struct Row {
+ group_name: String,
+ }
+ let groups: Vec<Row> =
+ sqlx::query_as("select group_name from groups_roster where contact_jid = ?")
+ .bind(&row.contact.user_jid)
+ .fetch_all(&self.db)
+ .await?;
+ row.contact.groups = HashSet::from_iter(groups.into_iter().map(|row| row.group_name));
+ }
+ let roster = roster
+ .into_iter()
+ .map(|row| (row.contact, row.user))
+ .collect();
+ Ok(roster)
+ }
+
pub(crate) async fn create_chat(&self, chat: Chat) -> Result<(), Error> {
let id = Uuid::new_v4();
let jid = chat.correspondent();
@@ -467,23 +497,197 @@ impl Db {
&self,
) -> Result<Vec<(Chat, Message)>, Error> {
#[derive(sqlx::FromRow)]
- pub struct ChatWithMessage {
+ pub struct RowChat {
+ chat_correspondent: JID,
+ chat_have_chatted: bool,
+ }
+ impl From<RowChat> for Chat {
+ fn from(value: RowChat) -> Self {
+ Self {
+ correspondent: value.chat_correspondent,
+ have_chatted: value.chat_have_chatted,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessage {
+ message_id: Uuid,
+ message_body: String,
+ message_delivery: Option<Delivery>,
+ message_timestamp: DateTime<Utc>,
+ message_from_jid: JID,
+ }
+ impl From<RowMessage> for Message {
+ fn from(value: RowMessage) -> Self {
+ Self {
+ id: value.message_id,
+ from: value.message_from_jid,
+ delivery: value.message_delivery,
+ timestamp: value.message_timestamp,
+ body: Body {
+ body: value.message_body,
+ },
+ }
+ }
+ }
+
+ #[derive(sqlx::FromRow)]
+ pub struct ChatWithMessageRow {
#[sqlx(flatten)]
- pub chat: Chat,
+ pub chat: RowChat,
#[sqlx(flatten)]
- pub message: Message,
+ pub message: RowMessage,
+ }
+
+ pub struct ChatWithMessage {
+ chat: Chat,
+ message: Message,
+ }
+
+ impl From<ChatWithMessageRow> for ChatWithMessage {
+ fn from(value: ChatWithMessageRow) -> Self {
+ Self {
+ chat: value.chat.into(),
+ message: value.message.into(),
+ }
+ }
}
// TODO: sometimes chats have no messages.
- // TODO: i don't know if this will assign the right uuid to the latest message or the chat's id. should probably check but i don't think it matters as nothing ever gets called with the id of the latest message in the chats list
- // TODO: it does matter in fact, as message updates and delivery receipts need to go to the right latest_message
- let chats: Vec<ChatWithMessage> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ let chats: Vec<ChatWithMessageRow> = sqlx::query_as("select c.*, m.* from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp order by m.timestamp desc")
+ .fetch_all(&self.db)
+ .await?;
+
+ let chats = chats
+ .into_iter()
+ .map(|chat_with_message_row| {
+ let chat_with_message: ChatWithMessage = chat_with_message_row.into();
+ (chat_with_message.chat, chat_with_message.message)
+ })
+ .collect();
+
+ Ok(chats)
+ }
+
+ /// chats ordered by date of last message
+ // greatest-n-per-group
+ pub(crate) async fn read_chats_ordered_with_latest_messages_and_users(
+ &self,
+ ) -> Result<Vec<((Chat, User), (Message, User))>, Error> {
+ #[derive(sqlx::FromRow)]
+ pub struct RowChat {
+ chat_correspondent: JID,
+ chat_have_chatted: bool,
+ }
+ impl From<RowChat> for Chat {
+ fn from(value: RowChat) -> Self {
+ Self {
+ correspondent: value.chat_correspondent,
+ have_chatted: value.chat_have_chatted,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessage {
+ message_id: Uuid,
+ message_body: String,
+ message_delivery: Option<Delivery>,
+ message_timestamp: DateTime<Utc>,
+ message_from_jid: JID,
+ }
+ impl From<RowMessage> for Message {
+ fn from(value: RowMessage) -> Self {
+ Self {
+ id: value.message_id,
+ from: value.message_from_jid,
+ delivery: value.message_delivery,
+ timestamp: value.message_timestamp,
+ body: Body {
+ body: value.message_body,
+ },
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowChatUser {
+ chat_user_jid: JID,
+ chat_user_nick: Option<String>,
+ chat_user_avatar: Option<String>,
+ }
+ impl From<RowChatUser> for User {
+ fn from(value: RowChatUser) -> Self {
+ Self {
+ jid: value.chat_user_jid,
+ nick: value.chat_user_nick,
+ avatar: value.chat_user_avatar,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct RowMessageUser {
+ message_user_jid: JID,
+ message_user_nick: Option<String>,
+ message_user_avatar: Option<String>,
+ }
+ impl From<RowMessageUser> for User {
+ fn from(value: RowMessageUser) -> Self {
+ Self {
+ jid: value.message_user_jid,
+ nick: value.message_user_nick,
+ avatar: value.message_user_avatar,
+ }
+ }
+ }
+ #[derive(sqlx::FromRow)]
+ pub struct ChatWithMessageAndUsersRow {
+ #[sqlx(flatten)]
+ pub chat: RowChat,
+ #[sqlx(flatten)]
+ pub chat_user: RowChatUser,
+ #[sqlx(flatten)]
+ pub message: RowMessage,
+ #[sqlx(flatten)]
+ pub message_user: RowMessageUser,
+ }
+
+ impl From<ChatWithMessageAndUsersRow> for ChatWithMessageAndUsers {
+ fn from(value: ChatWithMessageAndUsersRow) -> Self {
+ Self {
+ chat: value.chat.into(),
+ chat_user: value.chat_user.into(),
+ message: value.message.into(),
+ message_user: value.message_user.into(),
+ }
+ }
+ }
+
+ pub struct ChatWithMessageAndUsers {
+ chat: Chat,
+ chat_user: User,
+ message: Message,
+ message_user: User,
+ }
+
+ let chats: Vec<ChatWithMessageAndUsersRow> = sqlx::query_as("select c.id as chat_id, c.correspondent as chat_correspondent, c.have_chatted as chat_have_chatted, m.id as message_id, m.body as message_body, m.delivery as message_delivery, m.timestamp as message_timestamp, m.from_jid as message_from_jid, cu.jid as chat_user_jid, cu.nick as chat_user_nick, cu.avatar as chat_user_avatar, mu.jid as message_user_jid, mu.nick as message_user_nick, mu.avatar as message_user_avatar from chats c join (select chat_id, max(timestamp) max_timestamp from messages group by chat_id) max_timestamps on c.id = max_timestamps.chat_id join messages m on max_timestamps.chat_id = m.chat_id and max_timestamps.max_timestamp = m.timestamp join users as cu on cu.jid = c.correspondent join users as mu on mu.jid = m.from_jid order by m.timestamp desc")
.fetch_all(&self.db)
.await?;
let chats = chats
.into_iter()
- .map(|chat_with_message| (chat_with_message.chat, chat_with_message.message))
+ .map(|chat_with_message_and_users_row| {
+ let chat_with_message_and_users: ChatWithMessageAndUsers =
+ chat_with_message_and_users_row.into();
+ (
+ (
+ chat_with_message_and_users.chat,
+ chat_with_message_and_users.chat_user,
+ ),
+ (
+ chat_with_message_and_users.message,
+ chat_with_message_and_users.message_user,
+ ),
+ )
+ })
.collect();
Ok(chats)
@@ -637,6 +841,30 @@ impl Db {
Ok(messages)
}
+ pub(crate) async fn read_message_history_with_users(
+ &self,
+ chat: JID,
+ ) -> Result<Vec<(Message, User)>, Error> {
+ let chat_id = self.read_chat_id(chat).await?;
+ #[derive(sqlx::FromRow)]
+ pub struct Row {
+ #[sqlx(flatten)]
+ user: User,
+ #[sqlx(flatten)]
+ message: Message,
+ }
+ let messages: Vec<Row> =
+ sqlx::query_as("select * from messages join users on jid = from_jid where chat_id = ? order by timestamp asc")
+ .bind(chat_id)
+ .fetch_all(&self.db)
+ .await?;
+ let messages = messages
+ .into_iter()
+ .map(|row| (row.message, row.user))
+ .collect();
+ Ok(messages)
+ }
+
pub(crate) async fn read_cached_status(&self) -> Result<Online, Error> {
let online: Online = sqlx::query_as("select * from cached_status where id = 0")
.fetch_one(&self.db)
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
index 23272b1..f2bf6ef 100644
--- a/filamento/src/error.rs
+++ b/filamento/src/error.rs
@@ -3,7 +3,7 @@ use std::{num::TryFromIntError, string::FromUtf8Error, sync::Arc};
use base64::DecodeError;
use image::ImageError;
use jid::JID;
-use lampada::error::{ConnectionError, ReadError, WriteError};
+use lampada::error::{ActorError, ConnectionError, ReadError, WriteError};
use stanza::client::{Stanza, iq::Query};
use thiserror::Error;
@@ -21,7 +21,7 @@ pub enum Error<Fs: FileStore> {
// TODO: include content
// UnrecognizedContent(peanuts::element::Content),
#[error("iq receive error: {0}")]
- Iq(#[from] IqError),
+ Iq(#[from] IqProcessError),
// TODO: change to Connecting(ConnectingError)
#[error("connecting: {0}")]
Connecting(#[from] ConnectionJobError),
@@ -117,6 +117,17 @@ pub enum RosterError {
StanzaError(#[from] stanza::client::error::Error),
#[error("could not reply to roster push: {0}")]
PushReply(WriteError),
+ #[error("actor error: {0}")]
+ Actor(ActorError),
+}
+
+impl From<CommandError<RosterError>> for RosterError {
+ fn from(value: CommandError<RosterError>) -> Self {
+ match value {
+ CommandError::Actor(actor_error) => Self::Actor(actor_error),
+ CommandError::Error(e) => e,
+ }
+ }
}
#[derive(Debug, Error, Clone)]
@@ -181,6 +192,14 @@ pub enum IqError {
}
#[derive(Debug, Error, Clone)]
+pub enum IqProcessError {
+ #[error("iq error")]
+ Iq(#[from] IqError),
+ #[error("roster push")]
+ Roster(#[from] RosterError),
+}
+
+#[derive(Debug, Error, Clone)]
pub enum DatabaseOpenError {
#[error("error: {0}")]
Error(Arc<sqlx::Error>),
diff --git a/filamento/src/files.rs b/filamento/src/files.rs
index cd232f3..3acc871 100644
--- a/filamento/src/files.rs
+++ b/filamento/src/files.rs
@@ -35,6 +35,10 @@ impl Files {
let root = root.into();
Self { root }
}
+
+ pub fn root(&self) -> &Path {
+ &self.root
+ }
}
impl FileStore for Files {
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
index 7946241..14b0cae 100644
--- a/filamento/src/lib.rs
+++ b/filamento/src/lib.rs
@@ -52,6 +52,8 @@ pub mod user;
pub enum Command<Fs: FileStore> {
/// get the roster. if offline, retreive cached version from database. should be stored in application memory
GetRoster(oneshot::Sender<Result<Vec<Contact>, RosterError>>),
+ /// get the roster. if offline, retreive cached version from database. should be stored in application memory. includes user associated with each contact
+ GetRosterWithUsers(oneshot::Sender<Result<Vec<(Contact, User)>, RosterError>>),
/// get all chats. chat will include 10 messages in their message Vec (enough for chat previews)
// TODO: paging and filtering
GetChats(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
@@ -59,11 +61,21 @@ pub enum Command<Fs: FileStore> {
GetChatsOrdered(oneshot::Sender<Result<Vec<Chat>, DatabaseError>>),
// TODO: paging and filtering
GetChatsOrderedWithLatestMessages(oneshot::Sender<Result<Vec<(Chat, Message)>, DatabaseError>>),
+ // TODO: paging and filtering, nullabillity for latest message
+ GetChatsOrderedWithLatestMessagesAndUsers(
+ oneshot::Sender<Result<Vec<((Chat, User), (Message, User))>, DatabaseError>>,
+ ),
/// get a specific chat by jid
GetChat(JID, oneshot::Sender<Result<Chat, DatabaseError>>),
/// get message history for chat (does appropriate mam things)
// TODO: paging and filtering
GetMessages(JID, oneshot::Sender<Result<Vec<Message>, DatabaseError>>),
+ /// get message history for chat (does appropriate mam things)
+ // TODO: paging and filtering
+ GetMessagesWithUsers(
+ JID,
+ oneshot::Sender<Result<Vec<(Message, User)>, DatabaseError>>,
+ ),
/// delete a chat from your chat history, along with all the corresponding messages
DeleteChat(JID, oneshot::Sender<Result<(), DatabaseError>>),
/// delete a message from your chat history
@@ -150,13 +162,10 @@ pub enum Command<Fs: FileStore> {
#[derive(Debug, Clone)]
pub enum UpdateMessage {
- Online(Online, Vec<Contact>),
+ Online(Online, Vec<(Contact, User)>),
Offline(Offline),
- /// received roster from jabber server (replace full app roster state with this)
- /// is this needed?
- FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace)
- RosterUpdate(Contact),
+ RosterUpdate(Contact, User),
RosterDelete(JID),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence {
@@ -167,10 +176,12 @@ pub enum UpdateMessage {
// MessageDispatched(Uuid),
Message {
to: JID,
+ from: User,
message: Message,
},
MessageDelivery {
id: Uuid,
+ chat: JID,
delivery: Delivery,
},
SubscriptionRequest(jid::JID),
@@ -276,6 +287,22 @@ impl<Fs: FileStore> Client<Fs> {
Ok(roster)
}
+ pub async fn get_roster_with_users(
+ &self,
+ ) -> Result<Vec<(Contact, User)>, CommandError<RosterError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetRosterWithUsers(
+ send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let roster = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(roster)
+ }
+
pub async fn get_chats(&self) -> Result<Vec<Chat>, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChats(send)))
@@ -316,6 +343,22 @@ impl<Fs: FileStore> Client<Fs> {
Ok(chats)
}
+ pub async fn get_chats_ordered_with_latest_messages_and_users(
+ &self,
+ ) -> Result<Vec<((Chat, User), (Message, User))>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(send),
+ ))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let chats = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(chats)
+ }
+
pub async fn get_chat(&self, jid: JID) -> Result<Chat, CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::GetChat(jid, send)))
@@ -343,6 +386,23 @@ impl<Fs: FileStore> Client<Fs> {
Ok(messages)
}
+ pub async fn get_messages_with_users(
+ &self,
+ jid: JID,
+ ) -> Result<Vec<(Message, User)>, CommandError<DatabaseError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::GetMessagesWithUsers(
+ jid, send,
+ )))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let messages = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(messages)
+ }
+
pub async fn delete_chat(&self, jid: JID) -> Result<(), CommandError<DatabaseError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::DeleteChat(jid, send)))
diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs
index 37cdad5..9d61ca4 100644
--- a/filamento/src/logic/connect.rs
+++ b/filamento/src/logic/connect.rs
@@ -19,7 +19,7 @@ pub async fn handle_connect<Fs: FileStore + Clone + Send + Sync>(
debug!("getting roster");
logic
.clone()
- .handle_online(Command::GetRoster(send), connection.clone())
+ .handle_online(Command::GetRosterWithUsers(send), connection.clone())
.await;
debug!("sent roster req");
let roster = recv.await;
diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs
index cabbef4..dc94d2c 100644
--- a/filamento/src/logic/local_only.rs
+++ b/filamento/src/logic/local_only.rs
@@ -28,6 +28,15 @@ pub async fn handle_get_chats_ordered_with_latest_messages<Fs: FileStore + Clone
Ok(logic.db().read_chats_ordered_with_latest_messages().await?)
}
+pub async fn handle_get_chats_ordered_with_latest_messages_and_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<((Chat, User), (Message, User))>, DatabaseError> {
+ Ok(logic
+ .db()
+ .read_chats_ordered_with_latest_messages_and_users()
+ .await?)
+}
+
pub async fn handle_get_chat<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
jid: JID,
@@ -42,6 +51,13 @@ pub async fn handle_get_messages<Fs: FileStore + Clone>(
Ok(logic.db().read_message_history(jid).await?)
}
+pub async fn handle_get_messages_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ jid: JID,
+) -> Result<Vec<(Message, User)>, DatabaseError> {
+ Ok(logic.db().read_message_history_with_users(jid).await?)
+}
+
pub async fn handle_delete_chat<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
jid: JID,
diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs
index 566972c..b87484c 100644
--- a/filamento/src/logic/offline.rs
+++ b/filamento/src/logic/offline.rs
@@ -2,6 +2,7 @@ use std::process::id;
use chrono::Utc;
use lampada::error::WriteError;
+use tracing::error;
use uuid::Uuid;
use crate::{
@@ -14,6 +15,7 @@ use crate::{
files::FileStore,
presence::Online,
roster::Contact,
+ user::User,
};
use super::{
@@ -21,7 +23,8 @@ use super::{
local_only::{
handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
+ handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages,
+ handle_get_messages_with_users, handle_get_user,
},
};
@@ -47,6 +50,12 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>(
Ok(logic.db().read_cached_roster().await?)
}
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ Ok(logic.db().read_cached_roster_with_users().await?)
+}
+
pub async fn handle_offline_result<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
command: Command<Fs>,
@@ -56,6 +65,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let roster = handle_get_roster(logic).await;
sender.send(roster);
}
+ Command::GetRosterWithUsers(sender) => {
+ let roster = handle_get_roster_with_users(logic).await;
+ sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
sender.send(chats);
@@ -68,6 +81,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chats = handle_get_chat(logic, jid).await;
sender.send(chats);
@@ -76,6 +93,10 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
let messages = handle_get_messages(logic, jid).await;
sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
sender.send(result);
@@ -151,11 +172,25 @@ pub async fn handle_offline_result<Fs: FileStore + Clone>(
.handle_error(MessageSendError::MessageHistory(e.into()).into())
.await;
}
+
+ let from = match logic.db().read_user(logic.bare_jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.bare_jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ }
+ };
+
logic
.update_sender()
.send(crate::UpdateMessage::Message {
to: jid.as_bare(),
message,
+ from,
})
.await;
}
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index d9441d7..767f923 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -18,16 +18,13 @@ use uuid::Uuid;
use crate::{
avatar, chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
AvatarPublishError, DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PEPError, RosterError, StatusError, SubscribeError
- }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
+ }, files::FileStore, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, user::User, Command, UpdateMessage
};
use super::{
- ClientLogic,
local_only::{
- handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
- handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
- handle_get_messages, handle_get_user,
- },
+ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, handle_get_chats_ordered_with_latest_messages_and_users, handle_get_messages, handle_get_messages_with_users, handle_get_user
+ }, ClientLogic
};
pub async fn handle_online<Fs: FileStore + Clone>(logic: ClientLogic<Fs>, command: Command<Fs>, connection: Connected) {
@@ -97,6 +94,71 @@ pub async fn handle_get_roster<Fs: FileStore + Clone>(
}
}
+// this can't query the client... otherwise there is a hold-up and the connection can't complete
+pub async fn handle_get_roster_with_users<Fs: FileStore + Clone>(
+ logic: &ClientLogic<Fs>,
+ connection: Connected,
+) -> Result<Vec<(Contact, User)>, RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, stanza, iq_id.clone())
+ .await?;
+ // TODO: timeout
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .handle_error(Error::Roster(RosterError::Cache(e.into())))
+ .await;
+ };
+ let mut users = Vec::new();
+ for contact in &contacts {
+ let user = logic.db().read_user(contact.user_jid.clone()).await?;
+ users.push(user);
+ }
+ Ok(contacts.into_iter().zip(users).collect())
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
pub async fn handle_add_contact<Fs: FileStore + Clone>(
logic: &ClientLogic<Fs>,
connection: Connected,
@@ -470,12 +532,25 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
.await;
}
+ let from = match logic.db().read_user(logic.bare_jid.clone()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: logic.bare_jid.clone(),
+ nick: None,
+ avatar: None,
+ }
+ },
+ };
+
// tell the client a message is being sent
logic
.update_sender()
.send(UpdateMessage::Message {
to: jid.as_bare(),
message,
+ from,
})
.await;
@@ -513,6 +588,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Written,
+ chat: jid.clone(),
})
.await;
if mark_chat_as_chatted {
@@ -530,6 +606,7 @@ pub async fn handle_send_message<Fs: FileStore + Clone>(logic: &ClientLogic<Fs>,
.send(UpdateMessage::MessageDelivery {
id,
delivery: Delivery::Failed,
+ chat: jid,
})
.await;
logic.handle_error(MessageSendError::Write(e).into()).await;
@@ -1006,6 +1083,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let roster = handle_get_roster(logic, connection).await;
let _ = result_sender.send(roster);
}
+ Command::GetRosterWithUsers(result_sender) => {
+ let roster = handle_get_roster_with_users(logic, connection).await;
+ let _ = result_sender.send(roster);
+ }
Command::GetChats(sender) => {
let chats = handle_get_chats(logic).await;
let _ = sender.send(chats);
@@ -1018,6 +1099,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
let _ = sender.send(chats);
}
+ Command::GetChatsOrderedWithLatestMessagesAndUsers(sender) => {
+ let chats = handle_get_chats_ordered_with_latest_messages_and_users(logic).await;
+ sender.send(chats);
+ }
Command::GetChat(jid, sender) => {
let chat = handle_get_chat(logic, jid).await;
let _ = sender.send(chat);
@@ -1026,6 +1111,10 @@ pub async fn handle_online_result<Fs: FileStore + Clone>(
let messages = handle_get_messages(logic, jid).await;
let _ = sender.send(messages);
}
+ Command::GetMessagesWithUsers(jid, sender) => {
+ let messages = handle_get_messages_with_users(logic, jid).await;
+ sender.send(messages);
+ }
Command::DeleteChat(jid, sender) => {
let result = handle_delete_chat(logic, jid).await;
let _ = sender.send(result);
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 9c49b04..cdaff97 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -20,12 +20,13 @@ use crate::{
UpdateMessage, caps,
chat::{Body, Message},
error::{
- AvatarUpdateError, DatabaseError, Error, IqError, MessageRecvError, PresenceError,
- RosterError,
+ AvatarUpdateError, DatabaseError, Error, IqError, IqProcessError, MessageRecvError,
+ PresenceError, RosterError,
},
files::FileStore,
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
+ user::User,
};
use super::ClientLogic;
@@ -103,11 +104,24 @@ pub async fn recv_message<Fs: FileStore + Clone>(
}
};
+ let from_user = match logic.db().read_user(from.as_bare()).await {
+ Ok(u) => u,
+ Err(e) => {
+ error!("{}", e);
+ User {
+ jid: from.as_bare(),
+ nick: None,
+ avatar: None,
+ }
+ }
+ };
+
// update the client with the new message
logic
.update_sender()
.send(UpdateMessage::Message {
to: from.as_bare(),
+ from: from_user,
message,
})
.await;
@@ -541,11 +555,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
logic: ClientLogic<Fs>,
connection: Connected,
iq: Iq,
-) -> Result<Option<UpdateMessage>, IqError> {
+) -> Result<Option<UpdateMessage>, IqProcessError> {
if let Some(to) = &iq.to {
if *to == *connection.jid() {
} else {
- return Err(IqError::IncorrectAddressee(to.clone()));
+ return Err(IqProcessError::Iq(IqError::IncorrectAddressee(to.clone())));
}
}
match iq.r#type {
@@ -556,7 +570,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
.unwrap_or_else(|| connection.server().clone());
let id = iq.id.clone();
debug!("received iq result with id `{}` from {}", id, from);
- logic.pending().respond(Stanza::Iq(iq), id).await?;
+ logic
+ .pending()
+ .respond(Stanza::Iq(iq), id)
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
stanza::client::iq::IqType::Get => {
@@ -596,7 +614,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
@@ -612,7 +634,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
errors: vec![StanzaError::ItemNotFound.into()],
};
// TODO: log error
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
return Ok(None);
}
@@ -627,7 +653,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: Some(iq::Query::DiscoInfo(disco)),
errors: vec![],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to disco#info request from {}", from);
Ok(None)
}
@@ -642,7 +672,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq get from {}", from);
Ok(None)
} // stanza::client::iq::Query::Bind(bind) => todo!(),
@@ -662,7 +696,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::BadRequest.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
info!("replied to malformed iq query from {}", from);
Ok(None)
}
@@ -713,7 +751,12 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
.handle_error(RosterError::PushReply(e.into()).into())
.await;
}
- Ok(Some(UpdateMessage::RosterUpdate(contact)))
+ let user = logic
+ .db()
+ .read_user(contact.user_jid.clone())
+ .await
+ .map_err(|e| Into::<RosterError>::into(e))?;
+ Ok(Some(UpdateMessage::RosterUpdate(contact, user)))
}
}
} else {
@@ -727,7 +770,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
@@ -743,7 +790,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::ServiceUnavailable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
warn!("replied to unsupported iq set from {}", from);
Ok(None)
}
@@ -759,7 +810,11 @@ pub async fn recv_iq<Fs: FileStore + Clone>(
query: None,
errors: vec![StanzaError::NotAcceptable.into()],
};
- connection.write_handle().write(Stanza::Iq(iq)).await?;
+ connection
+ .write_handle()
+ .write(Stanza::Iq(iq))
+ .await
+ .map_err(|e| Into::<IqError>::into(e))?;
Ok(None)
}
}
diff --git a/filamento/src/user.rs b/filamento/src/user.rs
index 3d5dcb4..8669fc3 100644
--- a/filamento/src/user.rs
+++ b/filamento/src/user.rs
@@ -5,5 +5,5 @@ pub struct User {
pub jid: JID,
pub nick: Option<String>,
pub avatar: Option<String>,
- pub cached_status_message: Option<String>,
+ // pub cached_status_message: Option<String>,
}