aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-04-03 03:41:38 +0100
committerLibravatar cel 🌸 <cel@bunny.garden>2025-04-03 03:41:38 +0100
commit91f1994af940085d5d475a97820900ebbf0eb553 (patch)
tree6aab872f71d17a785d3d9286742fef38983d274c
parent9ce3827a7d25714d17f266f0f50bb29f41090175 (diff)
downloadluz-91f1994af940085d5d475a97820900ebbf0eb553.tar.gz
luz-91f1994af940085d5d475a97820900ebbf0eb553.tar.bz2
luz-91f1994af940085d5d475a97820900ebbf0eb553.zip
feat: better message handling, pep publish, xep_0172: nick
-rw-r--r--README.md4
-rw-r--r--filamento/Cargo.toml2
-rw-r--r--filamento/examples/example.rs33
-rw-r--r--filamento/migrations/20240113011930_luz.sql12
-rw-r--r--filamento/src/chat.rs64
-rw-r--r--filamento/src/db.rs92
-rw-r--r--filamento/src/error.rs52
-rw-r--r--filamento/src/lib.rs93
-rw-r--r--filamento/src/logic/mod.rs14
-rw-r--r--filamento/src/logic/offline.rs64
-rw-r--r--filamento/src/logic/online.rs355
-rw-r--r--filamento/src/logic/process_stanza.rs86
-rw-r--r--filamento/src/pep.rs17
-rw-r--r--filamento/src/presence.rs15
-rw-r--r--filamento/src/user.rs3
-rw-r--r--stanza/Cargo.toml1
-rw-r--r--stanza/src/client/iq.rs9
-rw-r--r--stanza/src/client/message.rs15
-rw-r--r--stanza/src/client/presence.rs17
-rw-r--r--stanza/src/lib.rs2
-rw-r--r--stanza/src/xep_0060/event.rs21
-rw-r--r--stanza/src/xep_0060/pubsub.rs41
-rw-r--r--stanza/src/xep_0172.rs30
23 files changed, 770 insertions, 272 deletions
diff --git a/README.md b/README.md
index f4db448..8afae76 100644
--- a/README.md
+++ b/README.md
@@ -22,8 +22,8 @@
- [ ] server side downgrade protection for sasl
- [x] xep-0199: xmpp ping
- [x] xep-0203: delayed delivery
-- [ ] xep-0030: service discovery
-- [ ] xep-0115: entity capabilities
+- [x] xep-0030: service discovery
+- [x] xep-0115: entity capabilities
- [ ] xep-0163: pep
- [ ] xep-0245: /me
- [ ] xep-0084: user avatar
diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml
index ef11192..e9be687 100644
--- a/filamento/Cargo.toml
+++ b/filamento/Cargo.toml
@@ -8,7 +8,7 @@ futures = "0.3.31"
lampada = { version = "0.1.0", path = "../lampada" }
tokio = "1.42.0"
thiserror = "2.0.11"
-stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030"] }
+stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172"] }
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
# TODO: re-export jid?
jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] }
diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs
index 506d698..74a9aa1 100644
--- a/filamento/examples/example.rs
+++ b/filamento/examples/example.rs
@@ -21,6 +21,8 @@ async fn main() {
client.connect().await.unwrap();
tokio::time::sleep(Duration::from_secs(5)).await;
+ info!("changing nick");
+ client.change_nick("britney".to_string()).await.unwrap();
info!("sending message");
client
.send_message(
@@ -32,9 +34,30 @@ async fn main() {
.await
.unwrap();
info!("sent message");
- info!("sending disco query");
- let info = client.disco_info(None, None).await.unwrap();
- info!("got disco result: {:#?}", info);
- let items = client.disco_items(None, None).await.unwrap();
- info!("got disco result: {:#?}", items);
+ tokio::time::sleep(Duration::from_secs(5)).await;
+ // info!("sending disco query");
+ // let info = client.disco_info(None, None).await.unwrap();
+ // info!("got disco result: {:#?}", info);
+ // let items = client.disco_items(None, None).await.unwrap();
+ // info!("got disco result: {:#?}", items);
+ // let info = client
+ // .disco_info(Some("blos.sm".parse().unwrap()), None)
+ // .await
+ // .unwrap();
+ // info!("got disco result: {:#?}", info);
+ // let items = client
+ // .disco_items(Some("blos.sm".parse().unwrap()), None)
+ // .await
+ // .unwrap();
+ // info!("got disco result: {:#?}", items);
+ // let info = client
+ // .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None)
+ // .await
+ // .unwrap();
+ // info!("got disco result: {:#?}", info);
+ // let items = client
+ // .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None)
+ // .await
+ // .unwrap();
+ // info!("got disco result: {:#?}", items);
}
diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql
index 148598b..3b56664 100644
--- a/filamento/migrations/20240113011930_luz.sql
+++ b/filamento/migrations/20240113011930_luz.sql
@@ -5,6 +5,7 @@ PRAGMA foreign_keys = on;
create table users(
-- TODO: enforce bare jid
jid text primary key not null,
+ nick text,
-- can receive presence status from non-contacts
cached_status_message text
-- TODO: last_seen
@@ -67,14 +68,24 @@ create table groups_roster(
-- can send chat message to user (creating a new chat if not already exists)
create table chats (
id text primary key not null,
+ have_chatted bool not null,
correspondent text not null unique,
foreign key(correspondent) references users(jid)
);
+-- enum for subscription state
+create table delivery(
+ state text primary key not null
+);
+
+insert into delivery ( state ) values ('sending'), ('written'), ('sent'), ('delivered'), ('read'), ('failed'), ('queued');
+
-- messages include reference to chat they are in, and who sent them.
create table messages (
id text primary key not null,
body text,
+ -- delivery is nullable as only messages sent by the user are markable
+ delivery text,
chat_id text not null,
-- TODO: channel stuff
-- channel_id uuid,
@@ -94,6 +105,7 @@ create table messages (
from_resource text,
-- check (from_jid != original_sender),
+ foreign key(delivery) references delivery(state),
-- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user)
-- TODO: read bool not null,
foreign key(chat_id) references chats(id) on delete cascade,
diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs
index c1194ea..147c7f7 100644
--- a/filamento/src/chat.rs
+++ b/filamento/src/chat.rs
@@ -1,5 +1,6 @@
use chrono::{DateTime, Utc};
use jid::JID;
+use sqlx::Sqlite;
use uuid::Uuid;
#[derive(Debug, sqlx::FromRow, Clone)]
@@ -7,7 +8,9 @@ pub struct Message {
pub id: Uuid,
// does not contain full user information
#[sqlx(rename = "from_jid")]
+ // bare jid (for now)
pub from: JID,
+ pub delivery: Option<Delivery>,
pub timestamp: DateTime<Utc>,
// TODO: originally_from
// TODO: message edits
@@ -16,6 +19,59 @@ pub struct Message {
pub body: Body,
}
+#[derive(Debug, Clone, Copy)]
+pub enum Delivery {
+ Sending,
+ Written,
+ Sent,
+ Delivered,
+ Read,
+ Failed,
+ Queued,
+}
+
+impl sqlx::Type<Sqlite> for Delivery {
+ fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo {
+ <&str as sqlx::Type<Sqlite>>::type_info()
+ }
+}
+
+impl sqlx::Decode<'_, Sqlite> for Delivery {
+ fn decode(
+ value: <Sqlite as sqlx::Database>::ValueRef<'_>,
+ ) -> Result<Self, sqlx::error::BoxDynError> {
+ let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?;
+ match value {
+ "sending" => Ok(Self::Sending),
+ "written" => Ok(Self::Written),
+ "sent" => Ok(Self::Sent),
+ "delivered" => Ok(Self::Delivered),
+ "read" => Ok(Self::Read),
+ "failed" => Ok(Self::Failed),
+ "queued" => Ok(Self::Queued),
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl sqlx::Encode<'_, Sqlite> for Delivery {
+ fn encode_by_ref(
+ &self,
+ buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>,
+ ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
+ let value = match self {
+ Delivery::Sending => "sending",
+ Delivery::Written => "written",
+ Delivery::Sent => "sent",
+ Delivery::Delivered => "delivered",
+ Delivery::Read => "read",
+ Delivery::Failed => "failed",
+ Delivery::Queued => "queued",
+ };
+ <&str as sqlx::Encode<Sqlite>>::encode(value, buf)
+ }
+}
+
// TODO: user migrations
// pub enum Migrated {
// Jabber(User),
@@ -31,6 +87,7 @@ pub struct Body {
#[derive(sqlx::FromRow, Debug, Clone)]
pub struct Chat {
pub correspondent: JID,
+ pub have_chatted: bool,
// pub unread_messages: i32,
// pub latest_message: Message,
// when a new message is received, the chat should be updated, and the new message should be delivered too.
@@ -41,8 +98,11 @@ pub struct Chat {
pub enum ChatUpdate {}
impl Chat {
- pub fn new(correspondent: JID) -> Self {
- Self { correspondent }
+ pub fn new(correspondent: JID, have_chatted: bool) -> Self {
+ Self {
+ correspondent,
+ have_chatted,
+ }
}
pub fn correspondent(&self) -> &JID {
&self.correspondent
diff --git a/filamento/src/db.rs b/filamento/src/db.rs
index 1054ec2..f92bfb2 100644
--- a/filamento/src/db.rs
+++ b/filamento/src/db.rs
@@ -50,8 +50,9 @@ impl Db {
pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
- "insert into users ( jid, cached_status_message ) values ( ?, ? )",
+ "insert into users ( jid, nick, cached_status_message ) values ( ?, ?, ? )",
user.jid,
+ user.nick,
user.cached_status_message
)
.execute(&self.db)
@@ -60,6 +61,12 @@ impl Db {
}
pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> {
+ sqlx::query!(
+ "insert into users ( jid ) values ( ? ) on conflict do nothing",
+ user
+ )
+ .execute(&self.db)
+ .await?;
let user: User = sqlx::query_as("select * from users where jid = ?")
.bind(user)
.fetch_one(&self.db)
@@ -67,10 +74,23 @@ impl Db {
Ok(user)
}
+ pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?",
+ jid,
+ nick,
+ nick
+ )
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {
sqlx::query!(
- "update users set cached_status_message = ? where jid = ?",
+ "update users set cached_status_message = ?, nick = ? where jid = ?",
user.cached_status_message,
+ user.nick,
user.jid
)
.execute(&self.db)
@@ -288,6 +308,17 @@ impl Db {
Ok(chat)
}
+ pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> {
+ let jid = chat.as_bare();
+ sqlx::query!(
+ "update chats set have_chatted = true where correspondent = ?",
+ jid
+ )
+ .execute(&self.db)
+ .await?;
+ Ok(())
+ }
+
pub(crate) async fn update_chat_correspondent(
&self,
old_chat: Chat,
@@ -387,38 +418,47 @@ impl Db {
}
/// if the chat doesn't already exist, it must be created by calling create_chat() before running this function.
- pub(crate) async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> {
+ pub(crate) async fn create_message(
+ &self,
+ message: Message,
+ chat: JID,
+ from: JID,
+ ) -> Result<(), Error> {
// TODO: one query
- let bare_jid = message.from.as_bare();
- let resource = message.from.resourcepart;
+ let from_jid = from.as_bare();
let chat_id = self.read_chat_id(chat).await?;
- sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource, message.timestamp).execute(&self.db).await?;
+ sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, from_jid, from.resourcepart, message.timestamp).execute(&self.db).await?;
Ok(())
}
- pub(crate) async fn create_message_with_self_resource_and_chat(
- &self,
- message: Message,
- chat: JID,
- ) -> Result<(), Error> {
- let from_jid = message.from.as_bare();
- let resource = &message.from.resourcepart;
+ pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> {
let bare_chat = chat.as_bare();
sqlx::query!(
"insert into users (jid) values (?) on conflict do nothing",
- from_jid
+ bare_chat,
)
.execute(&self.db)
.await?;
let id = Uuid::new_v4();
- sqlx::query!(
- "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
- id,
- bare_chat
- )
- .execute(&self.db)
- .await?;
- if let Some(resource) = resource {
+ let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing returning *")
+ .bind(id)
+ .bind(bare_chat)
+ .bind(false)
+ .fetch_one(&self.db)
+ .await?;
+ Ok(chat.have_chatted)
+ }
+
+ /// MUST upsert chat beforehand
+ pub(crate) async fn create_message_with_self_resource(
+ &self,
+ message: Message,
+ chat: JID,
+ // full jid
+ from: JID,
+ ) -> Result<(), Error> {
+ let from_jid = from.as_bare();
+ if let Some(resource) = &from.resourcepart {
sqlx::query!(
"insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",
from_jid,
@@ -427,15 +467,17 @@ impl Db {
.execute(&self.db)
.await?;
}
- self.create_message(message, chat).await?;
+ self.create_message(message, chat, from).await?;
Ok(())
}
// create direct message from incoming
- pub(crate) async fn create_message_with_user_resource_and_chat(
+ pub(crate) async fn create_message_with_user_resource(
&self,
message: Message,
chat: JID,
+ // full jid
+ from: JID,
) -> Result<(), Error> {
let bare_chat = chat.as_bare();
let resource = &chat.resourcepart;
@@ -462,7 +504,7 @@ impl Db {
.execute(&self.db)
.await?;
}
- self.create_message(message, chat).await?;
+ self.create_message(message, chat, from).await?;
Ok(())
}
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
index 8e2e4be..9ecc330 100644
--- a/filamento/src/error.rs
+++ b/filamento/src/error.rs
@@ -34,12 +34,22 @@ pub enum Error {
MessageSend(#[from] MessageSendError),
#[error("message receive error: {0}")]
MessageRecv(#[from] MessageRecvError),
+ #[error("subscripbe error: {0}")]
+ Subscribe(#[from] SubscribeError),
+ #[error("publish error: {0}")]
+ Publish(#[from] PublishError),
}
#[derive(Debug, Error, Clone)]
pub enum MessageSendError {
#[error("could not add to message history: {0}")]
MessageHistory(#[from] DatabaseError),
+ #[error("could not mark chat as chatted: {0}")]
+ MarkChatAsChatted(DatabaseError),
+ #[error("could not get client user details: {0}")]
+ GetUserDetails(DatabaseError),
+ #[error("writing message to connection: {0}")]
+ Write(#[from] WriteError),
}
#[derive(Debug, Error, Clone)]
@@ -48,6 +58,8 @@ pub enum MessageRecvError {
MessageHistory(#[from] DatabaseError),
#[error("missing from")]
MissingFrom,
+ #[error("could not update user nick: {0}")]
+ NickUpdate(DatabaseError),
}
#[derive(Debug, Error, Clone)]
@@ -75,7 +87,7 @@ pub enum RosterError {
#[error("cache: {0}")]
Cache(#[from] DatabaseError),
#[error("iq response: {0}")]
- IqResponse(#[from] RequestError),
+ IqResponse(#[from] IqRequestError),
#[error("stream write: {0}")]
Write(#[from] WriteError),
// TODO: display for stanza, to show as xml, same for read error types.
@@ -92,7 +104,7 @@ pub enum DiscoError {
#[error("write error: {0}")]
Write(#[from] WriteError),
#[error("iq response: {0}")]
- IqResponse(#[from] RequestError),
+ IqResponse(#[from] IqRequestError),
#[error("reply from incorrect entity: {0}")]
IncorrectEntity(JID),
#[error("unexpected reply: {0:?}")]
@@ -108,7 +120,7 @@ pub enum DiscoError {
}
#[derive(Debug, Error, Clone)]
-pub enum RequestError {
+pub enum IqRequestError {
#[error("sending request: {0}")]
Write(#[from] WriteError),
#[error("receiving expected response: {0}")]
@@ -173,6 +185,14 @@ impl From<tokio::io::Error> for DatabaseOpenError {
}
#[derive(Debug, Error, Clone)]
+pub enum SubscribeError {
+ #[error("write: {0}")]
+ Write(#[from] WriteError),
+ #[error("fetching client user details: {0}")]
+ Database(#[from] DatabaseError),
+}
+
+#[derive(Debug, Error, Clone)]
pub enum PresenceError {
#[error("unsupported")]
Unsupported,
@@ -181,3 +201,29 @@ pub enum PresenceError {
#[error("stanza error: {0}")]
StanzaError(#[from] stanza::client::error::Error),
}
+
+#[derive(Debug, Error, Clone)]
+pub enum PublishError {
+ #[error("received mismatched query")]
+ MismatchedQuery(Query),
+ #[error("missing query")]
+ MissingQuery,
+ #[error("stanza errors: {0:?}")]
+ StanzaErrors(Vec<stanza::client::error::Error>),
+ #[error("reply from incorrect entity: {0}")]
+ IncorrectEntity(JID),
+ #[error("unexpected stanza: {0:?}")]
+ UnexpectedStanza(Stanza),
+ #[error("iq response: {0}")]
+ IqResponse(#[from] IqRequestError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum NickError {
+ #[error("publishing nick: {0}")]
+ Publish(#[from] CommandError<PublishError>),
+ #[error("updating database: {0}")]
+ Database(#[from] DatabaseError),
+ #[error("disconnected")]
+ Disconnected,
+}
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
index 4d852e2..6118f75 100644
--- a/filamento/src/lib.rs
+++ b/filamento/src/lib.rs
@@ -6,13 +6,13 @@ use std::{
time::Duration,
};
-use chat::{Body, Chat, Message};
+use chat::{Body, Chat, Delivery, Message};
use chrono::Utc;
use db::Db;
use disco::{Info, Items};
use error::{
- ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, PresenceError,
- RosterError, StatusError,
+ ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, NickError,
+ PresenceError, PublishError, RosterError, StatusError, SubscribeError,
};
use futures::FutureExt;
use jid::JID;
@@ -40,6 +40,7 @@ pub mod db;
pub mod disco;
pub mod error;
mod logic;
+pub mod pep;
pub mod presence;
pub mod roster;
pub mod user;
@@ -68,13 +69,13 @@ pub enum Command {
/// add a contact to your roster, with a status of none, no subscriptions.
AddContact(JID, oneshot::Sender<Result<(), RosterError>>),
/// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster.
- BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ BuddyRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),
/// send a subscription request, without pre-approval. if not already added to roster server adds to roster.
- SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ SubscriptionRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),
/// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster.
- AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ AcceptBuddyRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),
/// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster.
- AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>),
+ AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),
/// unsubscribe to a contact, but don't remove their subscription.
UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),
/// stop a contact from being subscribed, but stay subscribed to the contact.
@@ -98,7 +99,9 @@ pub enum Command {
// TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)
/// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
- SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
+ SendMessage(JID, Body),
+ // TODO: resend failed messages
+ // ResendMessage(Uuid),
/// disco info query
DiscoInfo(
Option<JID>,
@@ -111,6 +114,14 @@ pub enum Command {
Option<String>,
oneshot::Sender<Result<disco::Items, DiscoError>>,
),
+ /// publish item to a pep node, specified or default according to item.
+ Publish {
+ item: pep::Item,
+ node: String,
+ sender: oneshot::Sender<Result<(), PublishError>>,
+ },
+ /// change user nickname
+ ChangeNick(String, oneshot::Sender<Result<(), NickError>>),
}
#[derive(Debug, Clone)]
@@ -134,7 +145,15 @@ pub enum UpdateMessage {
to: JID,
message: Message,
},
+ MessageDelivery {
+ id: Uuid,
+ delivery: Delivery,
+ },
SubscriptionRequest(jid::JID),
+ NickChanged {
+ jid: JID,
+ nick: String,
+ },
}
/// an xmpp client that is suited for a chat client use case
@@ -192,7 +211,7 @@ impl Client {
timeout: Duration::from_secs(10),
};
- let logic = ClientLogic::new(client.clone(), db, update_send);
+ let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send);
let actor: CoreClient<ClientLogic> =
CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
@@ -328,7 +347,7 @@ impl Client {
Ok(result)
}
- pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))
.await
@@ -340,7 +359,7 @@ impl Client {
Ok(result)
}
- pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::SubscriptionRequest(
jid, send,
@@ -354,7 +373,7 @@ impl Client {
Ok(result)
}
- pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> {
+ pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(
jid, send,
@@ -371,7 +390,7 @@ impl Client {
pub async fn accept_subscription_request(
&self,
jid: JID,
- ) -> Result<(), CommandError<WriteError>> {
+ ) -> Result<(), CommandError<SubscribeError>> {
let (send, recv) = oneshot::channel();
self.send(CoreClientCommand::Command(
Command::AcceptSubscriptionRequest(jid, send),
@@ -471,18 +490,10 @@ impl Client {
Ok(result)
}
- pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> {
- let (send, recv) = oneshot::channel();
- self.send(CoreClientCommand::Command(Command::SendMessage(
- jid, body, send,
- )))
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
- let result = timeout(self.timeout, recv)
- .await
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
- .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
- Ok(result)
+ pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), ActorError> {
+ self.send(CoreClientCommand::Command(Command::SendMessage(jid, body)))
+ .await?;
+ Ok(())
}
pub async fn disco_info(
@@ -520,6 +531,38 @@ impl Client {
.map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
Ok(result)
}
+
+ pub async fn publish(
+ &self,
+ item: pep::Item,
+ node: String,
+ ) -> Result<(), CommandError<PublishError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::Publish {
+ item,
+ node,
+ sender: send,
+ }))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
+
+ pub async fn change_nick(&self, nick: String) -> Result<(), CommandError<NickError>> {
+ let (send, recv) = oneshot::channel();
+ self.send(CoreClientCommand::Command(Command::ChangeNick(nick, send)))
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?;
+ let result = timeout(self.timeout, recv)
+ .await
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?
+ .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;
+ Ok(result)
+ }
}
impl From<Command> for CoreClientCommand<Command> {
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs
index 15c2d12..1ddd7d3 100644
--- a/filamento/src/logic/mod.rs
+++ b/filamento/src/logic/mod.rs
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};
+use jid::JID;
use lampada::{Connected, Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
@@ -8,7 +9,7 @@ use tracing::{error, info, warn};
use crate::{
Client, Command, UpdateMessage,
db::Db,
- error::{Error, RequestError, ResponseError},
+ error::{Error, IqRequestError, ResponseError},
};
mod abort;
@@ -23,6 +24,7 @@ mod process_stanza;
#[derive(Clone)]
pub struct ClientLogic {
client: Client,
+ bare_jid: JID,
db: Db,
pending: Pending,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -41,7 +43,7 @@ impl Pending {
connection: &Connected,
request: Stanza,
id: String,
- ) -> Result<Stanza, RequestError> {
+ ) -> Result<Stanza, IqRequestError> {
let (send, recv) = oneshot::channel();
{
self.0.lock().await.insert(id, send);
@@ -74,12 +76,18 @@ impl Pending {
}
impl ClientLogic {
- pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self {
+ pub fn new(
+ client: Client,
+ bare_jid: JID,
+ db: Db,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ ) -> Self {
Self {
db,
pending: Pending::new(),
update_sender,
client,
+ bare_jid,
}
}
diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs
index bc2666a..6399cf7 100644
--- a/filamento/src/logic/offline.rs
+++ b/filamento/src/logic/offline.rs
@@ -1,8 +1,14 @@
+use chrono::Utc;
use lampada::error::WriteError;
+use uuid::Uuid;
use crate::{
Command,
- error::{DatabaseError, DiscoError, Error, RosterError, StatusError},
+ chat::{Delivery, Message},
+ error::{
+ DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError,
+ StatusError,
+ },
presence::Online,
roster::Contact,
};
@@ -76,16 +82,16 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
sender.send(Err(RosterError::Write(WriteError::Disconnected)));
}
Command::BuddyRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
+ sender.send(Err(WriteError::Disconnected.into()));
}
Command::SubscriptionRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
+ sender.send(Err(WriteError::Disconnected.into()));
}
Command::AcceptBuddyRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
+ sender.send(Err(WriteError::Disconnected.into()));
}
Command::AcceptSubscriptionRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
+ sender.send(Err(WriteError::Disconnected.into()));
}
Command::UnsubscribeFromContact(_jid, sender) => {
sender.send(Err(WriteError::Disconnected));
@@ -107,8 +113,42 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
sender.send(result);
}
// TODO: offline message queue
- Command::SendMessage(_jid, _body, sender) => {
- sender.send(Err(WriteError::Disconnected));
+ Command::SendMessage(jid, body) => {
+ let id = Uuid::new_v4();
+ let timestamp = Utc::now();
+
+ let message = Message {
+ id,
+ from: logic.bare_jid.clone(),
+ // TODO: failure reason
+ delivery: Some(Delivery::Failed),
+ timestamp,
+ body,
+ };
+ // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send
+ // TODO: mark these as potentially failed upon client launch
+ if let Err(e) = logic
+ .db()
+ .create_message_with_self_resource(
+ message.clone(),
+ jid.clone(),
+ // TODO: when message is queued and sent, the from must also be updated with the correct resource
+ logic.bare_jid.clone(),
+ )
+ .await
+ {
+ // TODO: should these really be handle_error or just the error macro?
+ logic
+ .handle_error(MessageSendError::MessageHistory(e.into()).into())
+ .await;
+ }
+ logic
+ .update_sender()
+ .send(crate::UpdateMessage::Message {
+ to: jid.as_bare(),
+ message,
+ })
+ .await;
}
Command::SendPresence(_jid, _presence, sender) => {
sender.send(Err(WriteError::Disconnected));
@@ -119,6 +159,16 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res
Command::DiscoItems(_jid, _node, sender) => {
sender.send(Err(DiscoError::Write(WriteError::Disconnected)));
}
+ Command::Publish {
+ item: _,
+ node: _,
+ sender,
+ } => {
+ sender.send(Err(IqRequestError::Write(WriteError::Disconnected).into()));
+ }
+ Command::ChangeNick(_, sender) => {
+ sender.send(Err(NickError::Disconnected));
+ }
}
Ok(())
}
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index 63a4aa3..d32f527 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -3,10 +3,11 @@ use jid::JID;
use lampada::{Connected, WriteMessage, error::WriteError};
use stanza::{
client::{
- Stanza,
- iq::{self, Iq, IqType, Query},
+ iq::{self, Iq, IqType, Query}, Stanza
},
xep_0030::{info, items},
+ xep_0060::pubsub::{self, Pubsub},
+ xep_0172::{self, Nick},
xep_0203::Delay,
};
use tokio::sync::oneshot;
@@ -14,12 +15,9 @@ use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
- Command, UpdateMessage,
- chat::{Body, Message},
- disco::{Info, Items},
- error::{DatabaseError, DiscoError, Error, MessageSendError, RosterError, StatusError},
- presence::{Online, Presence, PresenceType},
- roster::{Contact, ContactUpdate},
+ chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{
+ DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError
+ }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage
};
use super::{
@@ -156,105 +154,82 @@ pub async fn handle_add_contact(
}
}
-pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> {
+pub async fn handle_buddy_request(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+) -> Result<(), SubscribeError> {
+ let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ nick,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_subscription_request(
+ logic: &ClientLogic,
connection: Connected,
jid: JID,
-) -> Result<(), WriteError> {
+) -> Result<(), SubscribeError> {
// TODO: i should probably have builders
+ let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ nick,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_accept_buddy_request(
+ logic: &ClientLogic,
connection: Connected,
jid: JID,
-) -> Result<(), WriteError> {
+) -> Result<(), SubscribeError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
+ let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ nick,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
}
pub async fn handle_accept_subscription_request(
+ logic: &ClientLogic,
connection: Connected,
jid: JID,
-) -> Result<(), WriteError> {
+) -> Result<(), SubscribeError> {
+ let client_user = logic.db.read_user(logic.bare_jid.clone()).await?;
+ let nick = client_user.nick.map(|nick| Nick(nick));
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ nick,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
@@ -265,16 +240,9 @@ pub async fn handle_unsubscribe_from_contact(
jid: JID,
) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
@@ -282,16 +250,9 @@ pub async fn handle_unsubscribe_from_contact(
pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
@@ -299,29 +260,15 @@ pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Resu
pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid.clone()),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
to: Some(jid),
r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
+ ..Default::default()
});
connection.write_handle().write(presence).await?;
Ok(())
@@ -464,60 +411,119 @@ pub async fn handle_set_status(
Ok(())
}
-pub async fn handle_send_message(
- logic: &ClientLogic,
- connection: Connected,
- jid: JID,
- body: Body,
-) -> Result<(), WriteError> {
+pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) {
+ // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default.
+ let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false);
+
+ let nick;
+ let mark_chat_as_chatted;
+ if have_chatted == false {
+ match logic.db.read_user(logic.bare_jid.clone()).await {
+ Ok(u) => {
+ nick = u.nick.map(|nick| Nick(nick));
+ mark_chat_as_chatted = true;
+ }
+ Err(e) => {
+ logic
+ .handle_error(MessageSendError::GetUserDetails(e.into()).into())
+ .await;
+ nick = None;
+ mark_chat_as_chatted = false;
+ }
+ }
+ } else {
+ nick = None;
+ mark_chat_as_chatted = false;
+ }
+
+ // generate message struct
let id = Uuid::new_v4();
let timestamp = Utc::now();
- let message = Stanza::Message(stanza::client::message::Message {
+ let message = Message {
+ id,
+ from: connection.jid().as_bare(),
+ body: body.clone(),
+ timestamp,
+ delivery: Some(Delivery::Sending),
+ };
+
+ // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send
+ // TODO: mark these as potentially failed upon client launch
+ if let Err(e) = logic
+ .db()
+ .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone())
+ .await
+ {
+ // TODO: should these really be handle_error or just the error macro?
+ logic
+ .handle_error(MessageSendError::MessageHistory(e.into()).into())
+ .await;
+ }
+
+ // tell the client a message is being sent
+ logic
+ .update_sender()
+ .send(UpdateMessage::Message {
+ to: jid.as_bare(),
+ message,
+ })
+ .await;
+
+ // prepare the message stanza
+ let message_stanza = Stanza::Message(stanza::client::message::Message {
from: Some(connection.jid().clone()),
id: Some(id.to_string()),
to: Some(jid.clone()),
// TODO: specify message type
r#type: stanza::client::message::MessageType::Chat,
// TODO: lang ?
- lang: None,
- subject: None,
body: Some(stanza::client::message::Body {
lang: None,
body: Some(body.body.clone()),
}),
- thread: None,
// include delay to have a consistent timestamp between server and client
delay: Some(Delay {
from: None,
stamp: timestamp,
}),
+ nick,
+ ..Default::default()
});
- connection.write_handle().write(message).await?;
- let mut message = Message {
- id,
- from: connection.jid().clone(),
- body,
- timestamp,
- };
- info!("sent message: {:?}", message);
- if let Err(e) = logic
- .db()
- .create_message_with_self_resource_and_chat(message.clone(), jid.clone())
- .await
- {
- // TODO: should these really be handle_error or just the error macro?
- logic
- .handle_error(MessageSendError::MessageHistory(e.into()).into())
- .await;
- }
- // TODO: don't do this, have separate from from details
- message.from = message.from.as_bare();
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Message { to: jid, message })
+
+ // send the message
+ let result = connection
+ .write_handle()
+ .write(message_stanza.clone())
.await;
- Ok(())
- // TODO: refactor this to send a sending updatemessage, then update or something like that
+ match result {
+ Ok(_) => {
+ info!("sent message: {:?}", message_stanza);
+ logic
+ .update_sender()
+ .send(UpdateMessage::MessageDelivery {
+ id,
+ delivery: Delivery::Written,
+ })
+ .await;
+ if mark_chat_as_chatted {
+ if let Err(e) = logic.db.mark_chat_as_chatted(jid).await {
+ logic
+ .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into())
+ .await;
+ }
+ }
+ }
+ Err(e) => {
+ logic
+ .update_sender()
+ .send(UpdateMessage::MessageDelivery {
+ id,
+ delivery: Delivery::Failed,
+ })
+ .await;
+ logic.handle_error(MessageSendError::Write(e).into()).await;
+ }
+ }
}
pub async fn handle_send_presence(
@@ -673,6 +679,78 @@ pub async fn handle_disco_items(
}
}
+pub async fn handle_publish(
+ logic: &ClientLogic,
+ connection: Connected,
+ item: pep::Item,
+ node: String,
+) -> Result<(), PublishError> {
+ let id = Uuid::new_v4().to_string();
+ let publish = match item {
+ pep::Item::Nick(n) => pubsub::Publish {
+ node,
+ items: vec![pubsub::Item {
+ item: Some(pubsub::Content::Nick(Nick(n))),
+ ..Default::default()
+ }],
+ },
+ };
+ let request = Iq {
+ from: Some(connection.jid().clone()),
+ id: id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(Query::Pubsub(Pubsub::Publish(publish, None))),
+ errors: Vec::new(),
+ };
+ match logic
+ .pending()
+ .request(&connection, Stanza::Iq(request), id)
+ .await? {
+
+ Stanza::Iq(Iq {
+ from,
+ r#type,
+ query,
+ errors,
+ ..
+ // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise.
+ }) if r#type == IqType::Result || r#type == IqType::Error => {
+ if from == None ||
+ from == Some(connection.jid().as_bare())
+ {
+ match r#type {
+ IqType::Result => {
+ if let Some(query) = query {
+ match query {
+ Query::Pubsub(_) => Ok(()),
+ q => Err(PublishError::MismatchedQuery(q)),
+ }
+ } else {
+ Err(PublishError::MissingQuery)
+ }
+ }
+ IqType::Error => {
+ Err(PublishError::StanzaErrors(errors))
+ }
+ _ => unreachable!(),
+ }
+ } else {
+ Err(PublishError::IncorrectEntity(
+ from.unwrap_or_else(|| connection.jid().as_bare()),
+ ))
+ }
+ }
+ s => Err(PublishError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> {
+ logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?;
+ Ok(())
+}
+
// TODO: could probably macro-ise?
pub async fn handle_online_result(
logic: &ClientLogic,
@@ -716,25 +794,24 @@ pub async fn handle_online_result(
let user = handle_get_user(logic, jid).await;
let _ = sender.send(user);
}
- // TODO: offline queue to modify roster
Command::AddContact(jid, sender) => {
let result = handle_add_contact(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::BuddyRequest(jid, sender) => {
- let result = handle_buddy_request(connection, jid).await;
+ let result = handle_buddy_request(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::SubscriptionRequest(jid, sender) => {
- let result = handle_subscription_request(connection, jid).await;
+ let result = handle_subscription_request(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::AcceptBuddyRequest(jid, sender) => {
- let result = handle_accept_buddy_request(connection, jid).await;
+ let result = handle_accept_buddy_request(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::AcceptSubscriptionRequest(jid, sender) => {
- let result = handle_accept_subscription_request(connection, jid).await;
+ let result = handle_accept_subscription_request(logic, connection, jid).await;
let _ = sender.send(result);
}
Command::UnsubscribeFromContact(jid, sender) => {
@@ -761,10 +838,8 @@ pub async fn handle_online_result(
let result = handle_set_status(logic, connection, online).await;
let _ = sender.send(result);
}
- // TODO: offline message queue
- Command::SendMessage(jid, body, sender) => {
- let result = handle_send_message(logic, connection, jid, body).await;
- let _ = sender.send(result);
+ Command::SendMessage(jid, body) => {
+ handle_send_message(logic, connection, jid, body).await;
}
Command::SendPresence(jid, presence, sender) => {
let result = handle_send_presence(connection, jid, presence).await;
@@ -778,6 +853,14 @@ pub async fn handle_online_result(
let result = handle_disco_items(logic, connection, jid, node).await;
let _ = sender.send(result);
}
+ Command::Publish { item, node, sender } => {
+ let result = handle_publish(logic, connection, item, node).await;
+ let _ = sender.send(result);
+ }
+ Command::ChangeNick(nick, sender) => {
+ let result = handle_change_nick(logic, nick).await;
+ let _ = sender.send(result);
+ }
}
Ok(())
}
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index b1bc830..2f6644e 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -41,38 +41,74 @@ pub async fn recv_message(
logic: ClientLogic,
stanza_message: stanza::client::message::Message,
) -> Result<Option<UpdateMessage>, MessageRecvError> {
- if let Some(mut from) = stanza_message.from {
+ if let Some(from) = stanza_message.from {
// TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
let timestamp = stanza_message
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
// TODO: group chat messages
- let mut message = Message {
- id: stanza_message
- .id
- // TODO: proper id storage
- .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
- .unwrap_or_else(|| Uuid::new_v4()),
- from: from.clone(),
- timestamp,
- body: Body {
- // TODO: should this be an option?
- body: stanza_message
- .body
- .map(|body| body.body)
- .unwrap_or_default()
- .unwrap_or_default(),
- },
- };
+
+ // if there is a body, should create chat message
+ if let Some(body) = stanza_message.body {
+ let message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id xep
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.as_bare(),
+ timestamp,
+ body: Body {
+ body: body.body.unwrap_or_default(),
+ },
+ delivery: None,
+ };
+
+ // save the message to the database
+ logic.db().upsert_chat_and_user(&from).await?;
+ if let Err(e) = logic
+ .db()
+ .create_message_with_user_resource(message.clone(), from.clone(), from.clone())
+ .await
+ {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e)))
+ .await;
+ }
+
+ // update the client with the new message
+ logic
+ .update_sender()
+ .send(UpdateMessage::Message {
+ to: from.as_bare(),
+ message,
+ })
+ .await;
+ }
+
+ if let Some(nick) = stanza_message.nick {
+ if let Err(e) = logic
+ .db()
+ .upsert_user_nick(from.as_bare(), nick.0.clone())
+ .await
+ {
+ logic
+ .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e)))
+ .await;
+ }
+
+ logic
+ .update_sender()
+ .send(UpdateMessage::NickChanged {
+ jid: from.as_bare(),
+ nick: nick.0,
+ })
+ .await;
+ }
+
+ Ok(None)
// TODO: can this be more efficient?
- logic
- .db()
- .create_message_with_user_resource_and_chat(message.clone(), from.clone())
- .await?;
- message.from = message.from.as_bare();
- from = from.as_bare();
- Ok(Some(UpdateMessage::Message { to: from, message }))
} else {
Err(MessageRecvError::MissingFrom)
}
diff --git a/filamento/src/pep.rs b/filamento/src/pep.rs
new file mode 100644
index 0000000..c71d843
--- /dev/null
+++ b/filamento/src/pep.rs
@@ -0,0 +1,17 @@
+// in commandmessage
+// pub struct Publish {
+// item: Item,
+// node: Option<String>,
+// // no need for node, as item has the node
+// }
+//
+// in updatemessage
+// pub struct Event {
+// from: JID,
+// item: Item,
+// }
+
+#[derive(Clone, Debug)]
+pub enum Item {
+ Nick(String),
+}
diff --git a/filamento/src/presence.rs b/filamento/src/presence.rs
index e35761c..bae8793 100644
--- a/filamento/src/presence.rs
+++ b/filamento/src/presence.rs
@@ -78,11 +78,6 @@ impl Online {
timestamp: Option<DateTime<Utc>>,
) -> stanza::client::presence::Presence {
stanza::client::presence::Presence {
- from: None,
- id: None,
- to: None,
- r#type: None,
- lang: None,
show: self.show.map(|show| match show {
Show::Away => stanza::client::presence::Show::Away,
Show::Chat => stanza::client::presence::Show::Chat,
@@ -97,11 +92,11 @@ impl Online {
priority: self
.priority
.map(|priority| stanza::client::presence::Priority(priority)),
- errors: Vec::new(),
delay: timestamp.map(|timestamp| Delay {
from: None,
stamp: timestamp,
}),
+ ..Default::default()
}
}
}
@@ -112,22 +107,16 @@ impl Offline {
timestamp: Option<DateTime<Utc>>,
) -> stanza::client::presence::Presence {
stanza::client::presence::Presence {
- from: None,
- id: None,
- to: None,
r#type: Some(stanza::client::presence::PresenceType::Unavailable),
- lang: None,
- show: None,
status: self.status.map(|status| stanza::client::presence::Status {
lang: None,
status: String1024(status),
}),
- priority: None,
- errors: Vec::new(),
delay: timestamp.map(|timestamp| Delay {
from: None,
stamp: timestamp,
}),
+ ..Default::default()
}
}
}
diff --git a/filamento/src/user.rs b/filamento/src/user.rs
index 9914d14..85471d5 100644
--- a/filamento/src/user.rs
+++ b/filamento/src/user.rs
@@ -1,7 +1,8 @@
use jid::JID;
-#[derive(Debug, sqlx::FromRow)]
+#[derive(Debug, sqlx::FromRow, Clone)]
pub struct User {
pub jid: JID,
+ pub nick: Option<String>,
pub cached_status_message: Option<String>,
}
diff --git a/stanza/Cargo.toml b/stanza/Cargo.toml
index 64962b6..3c3c3e0 100644
--- a/stanza/Cargo.toml
+++ b/stanza/Cargo.toml
@@ -16,5 +16,6 @@ xep_0030 = []
xep_0059 = []
xep_0060 = ["xep_0004", "dep:chrono"]
xep_0131 = []
+xep_0172 = []
xep_0199 = []
xep_0203 = ["dep:chrono"]
diff --git a/stanza/src/client/iq.rs b/stanza/src/client/iq.rs
index 6d0c671..50884aa 100644
--- a/stanza/src/client/iq.rs
+++ b/stanza/src/client/iq.rs
@@ -17,6 +17,9 @@ use crate::roster;
#[cfg(feature = "xep_0030")]
use crate::xep_0030::{self, info, items};
+#[cfg(feature = "xep_0060")]
+use crate::xep_0060::pubsub::{self, Pubsub};
+
#[cfg(feature = "xep_0199")]
use crate::xep_0199::{self, Ping};
@@ -42,6 +45,8 @@ pub enum Query {
DiscoInfo(info::Query),
#[cfg(feature = "xep_0030")]
DiscoItems(items::Query),
+ #[cfg(feature = "xep_0060")]
+ Pubsub(Pubsub),
#[cfg(feature = "xep_0199")]
Ping(Ping),
#[cfg(feature = "rfc_6121")]
@@ -67,6 +72,8 @@ impl FromElement for Query {
(Some(xep_0030::items::XMLNS), "query") => {
Ok(Query::DiscoItems(items::Query::from_element(element)?))
}
+ #[cfg(feature = "xep_0060")]
+ (Some(pubsub::XMLNS), "pubsub") => Ok(Query::Pubsub(Pubsub::from_element(element)?)),
_ => Ok(Query::Unsupported),
}
}
@@ -86,6 +93,8 @@ impl IntoElement for Query {
Query::DiscoInfo(query) => query.builder(),
#[cfg(feature = "xep_0030")]
Query::DiscoItems(query) => query.builder(),
+ #[cfg(feature = "xep_0060")]
+ Query::Pubsub(pubsub) => pubsub.builder(),
}
}
}
diff --git a/stanza/src/client/message.rs b/stanza/src/client/message.rs
index e521613..d94b82e 100644
--- a/stanza/src/client/message.rs
+++ b/stanza/src/client/message.rs
@@ -8,12 +8,14 @@ use peanuts::{
#[cfg(feature = "xep_0131")]
use crate::xep_0131::Headers;
+#[cfg(feature = "xep_0172")]
+use crate::xep_0172::Nick;
#[cfg(feature = "xep_0203")]
use crate::xep_0203::Delay;
use super::XMLNS;
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Default)]
pub struct Message {
pub from: Option<JID>,
pub id: Option<String>,
@@ -29,6 +31,8 @@ pub struct Message {
pub delay: Option<Delay>,
#[cfg(feature = "xep_0131")]
pub headers: Option<Headers>,
+ #[cfg(feature = "xep_0172")]
+ pub nick: Option<Nick>,
}
impl FromElement for Message {
@@ -52,6 +56,9 @@ impl FromElement for Message {
#[cfg(feature = "xep_0131")]
let headers = element.child_opt()?;
+ #[cfg(feature = "xep_0172")]
+ let nick = element.child_opt()?;
+
Ok(Message {
from,
id,
@@ -65,6 +72,8 @@ impl FromElement for Message {
delay,
#[cfg(feature = "xep_0131")]
headers,
+ #[cfg(feature = "xep_0172")]
+ nick,
})
}
}
@@ -93,6 +102,9 @@ impl IntoElement for Message {
#[cfg(feature = "xep_0131")]
let builder = builder.push_child_opt(self.headers.clone());
+ #[cfg(feature = "xep_0172")]
+ let builder = builder.push_child_opt(self.nick.clone());
+
builder
}
}
@@ -137,6 +149,7 @@ impl ToString for MessageType {
#[derive(Clone, Debug)]
pub struct Body {
pub lang: Option<String>,
+ // TODO: string stuff
pub body: Option<String>,
}
diff --git a/stanza/src/client/presence.rs b/stanza/src/client/presence.rs
index 8fb96be..bffb0d0 100644
--- a/stanza/src/client/presence.rs
+++ b/stanza/src/client/presence.rs
@@ -8,12 +8,14 @@ use peanuts::{
#[cfg(feature = "xep_0131")]
use crate::xep_0131::Headers;
+#[cfg(feature = "xep_0172")]
+use crate::xep_0172::Nick;
#[cfg(feature = "xep_0203")]
use crate::xep_0203::Delay;
use super::{error::Error, XMLNS};
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Default)]
pub struct Presence {
pub from: Option<JID>,
pub id: Option<String>,
@@ -28,8 +30,9 @@ pub struct Presence {
pub delay: Option<Delay>,
#[cfg(feature = "xep_0131")]
pub headers: Option<Headers>,
- // TODO: ##other
- // other: Vec<Other>,
+ #[cfg(feature = "xep_0172")]
+ pub nick: Option<Nick>,
+ // ##other
pub errors: Vec<Error>,
}
@@ -55,6 +58,9 @@ impl FromElement for Presence {
#[cfg(feature = "xep_0131")]
let headers = element.child_opt()?;
+ #[cfg(feature = "xep_0172")]
+ let nick = element.child_opt()?;
+
Ok(Presence {
from,
id,
@@ -69,6 +75,8 @@ impl FromElement for Presence {
delay,
#[cfg(feature = "xep_0131")]
headers,
+ #[cfg(feature = "xep_0172")]
+ nick,
})
}
}
@@ -92,6 +100,9 @@ impl IntoElement for Presence {
#[cfg(feature = "xep_0131")]
let builder = builder.push_child_opt(self.headers.clone());
+ #[cfg(feature = "xep_0172")]
+ let builder = builder.push_child_opt(self.nick.clone());
+
builder
}
}
diff --git a/stanza/src/lib.rs b/stanza/src/lib.rs
index 0a71a26..5474aee 100644
--- a/stanza/src/lib.rs
+++ b/stanza/src/lib.rs
@@ -19,6 +19,8 @@ pub mod xep_0059;
pub mod xep_0060;
#[cfg(feature = "xep_0131")]
pub mod xep_0131;
+#[cfg(feature = "xep_0172")]
+pub mod xep_0172;
#[cfg(feature = "xep_0199")]
pub mod xep_0199;
#[cfg(feature = "xep_0203")]
diff --git a/stanza/src/xep_0060/event.rs b/stanza/src/xep_0060/event.rs
index bdd8b53..d2c150a 100644
--- a/stanza/src/xep_0060/event.rs
+++ b/stanza/src/xep_0060/event.rs
@@ -8,6 +8,8 @@ use peanuts::{
};
use crate::xep_0004::X;
+#[cfg(feature = "xep_0172")]
+use crate::xep_0172::{self, Nick};
pub const XMLNS: &str = "http://jabber.org/protocol/pubsub#event";
@@ -292,19 +294,28 @@ impl IntoElement for Item {
#[derive(Clone, Debug)]
pub enum Content {
- Unknown,
+ #[cfg(feature = "xep_0172")]
+ Nick(Nick),
+ Unknown(Element),
}
impl FromElement for Content {
- fn from_element(_element: Element) -> peanuts::element::DeserializeResult<Self> {
- // TODO: types
- return Ok(Self::Unknown);
+ fn from_element(element: Element) -> peanuts::element::DeserializeResult<Self> {
+ match element.identify() {
+ #[cfg(feature = "xep_0172")]
+ (Some(xep_0172::XMLNS), "nick") => Ok(Content::Nick(Nick::from_element(element)?)),
+ _ => Ok(Self::Unknown(element)),
+ }
}
}
impl IntoElement for Content {
fn builder(&self) -> peanuts::element::ElementBuilder {
- panic!("unknown content cannot be serialized")
+ match self {
+ #[cfg(feature = "xep_0172")]
+ Content::Nick(nick) => nick.builder(),
+ Content::Unknown(_e) => panic!("unknown content cannot be serialized"),
+ }
}
}
diff --git a/stanza/src/xep_0060/pubsub.rs b/stanza/src/xep_0060/pubsub.rs
index 3a15a59..25fc405 100644
--- a/stanza/src/xep_0060/pubsub.rs
+++ b/stanza/src/xep_0060/pubsub.rs
@@ -7,6 +7,8 @@ use peanuts::{
};
use crate::xep_0004::X;
+#[cfg(feature = "xep_0172")]
+use crate::xep_0172::{self, Nick};
pub const XMLNS: &str = "http://jabber.org/protocol/pubsub";
@@ -301,10 +303,10 @@ impl ToString for DefaultType {
#[derive(Clone, Debug)]
pub struct Items {
- max_items: Option<usize>,
- node: String,
- subid: Option<String>,
- items: Vec<Item>,
+ pub max_items: Option<usize>,
+ pub node: String,
+ pub subid: Option<String>,
+ pub items: Vec<Item>,
}
impl FromElement for Items {
@@ -337,11 +339,11 @@ impl IntoElement for Items {
}
}
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, Default)]
pub struct Item {
- id: Option<String>,
- publisher: Option<String>,
- item: Option<Content>,
+ pub id: Option<String>,
+ pub publisher: Option<String>,
+ pub item: Option<Content>,
}
impl FromElement for Item {
@@ -373,19 +375,28 @@ impl IntoElement for Item {
#[derive(Clone, Debug)]
pub enum Content {
- Unknown,
+ #[cfg(feature = "xep_0172")]
+ Nick(Nick),
+ Unknown(Element),
}
impl FromElement for Content {
- fn from_element(_element: Element) -> peanuts::element::DeserializeResult<Self> {
- // TODO: types
- return Ok(Self::Unknown);
+ fn from_element(element: Element) -> peanuts::element::DeserializeResult<Self> {
+ match element.identify() {
+ #[cfg(feature = "xep_0172")]
+ (Some(xep_0172::XMLNS), "nick") => Ok(Content::Nick(Nick::from_element(element)?)),
+ _ => Ok(Self::Unknown(element)),
+ }
}
}
impl IntoElement for Content {
fn builder(&self) -> peanuts::element::ElementBuilder {
- panic!("unknown content cannot be serialized")
+ match self {
+ #[cfg(feature = "xep_0172")]
+ Content::Nick(nick) => nick.builder(),
+ Content::Unknown(_e) => panic!("unknown content cannot be serialized"),
+ }
}
}
@@ -429,8 +440,8 @@ impl IntoElement for Options {
#[derive(Clone, Debug)]
pub struct Publish {
- node: String,
- items: Vec<Item>,
+ pub node: String,
+ pub items: Vec<Item>,
}
impl FromElement for Publish {
diff --git a/stanza/src/xep_0172.rs b/stanza/src/xep_0172.rs
new file mode 100644
index 0000000..1c24200
--- /dev/null
+++ b/stanza/src/xep_0172.rs
@@ -0,0 +1,30 @@
+use peanuts::{
+ element::{FromElement, IntoElement},
+ Element,
+};
+
+pub const XMLNS: &str = "http://jabber.org/protocol/nick";
+
+#[derive(Debug, Clone)]
+pub struct Nick(pub String);
+
+impl FromElement for Nick {
+ fn from_element(mut element: peanuts::Element) -> peanuts::element::DeserializeResult<Self> {
+ element.check_name("nick")?;
+ element.check_namespace(XMLNS)?;
+
+ Ok(Self(element.pop_value_opt()?.unwrap_or_default()))
+ }
+}
+
+impl IntoElement for Nick {
+ fn builder(&self) -> peanuts::element::ElementBuilder {
+ let builder = Element::builder("nick", Some(XMLNS));
+
+ if self.0.is_empty() {
+ builder
+ } else {
+ builder.push_text(self.0.clone())
+ }
+ }
+}