diff options
| -rw-r--r-- | README.md | 4 | ||||
| -rw-r--r-- | filamento/Cargo.toml | 2 | ||||
| -rw-r--r-- | filamento/examples/example.rs | 33 | ||||
| -rw-r--r-- | filamento/migrations/20240113011930_luz.sql | 12 | ||||
| -rw-r--r-- | filamento/src/chat.rs | 64 | ||||
| -rw-r--r-- | filamento/src/db.rs | 92 | ||||
| -rw-r--r-- | filamento/src/error.rs | 52 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 93 | ||||
| -rw-r--r-- | filamento/src/logic/mod.rs | 14 | ||||
| -rw-r--r-- | filamento/src/logic/offline.rs | 64 | ||||
| -rw-r--r-- | filamento/src/logic/online.rs | 355 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 86 | ||||
| -rw-r--r-- | filamento/src/pep.rs | 17 | ||||
| -rw-r--r-- | filamento/src/presence.rs | 15 | ||||
| -rw-r--r-- | filamento/src/user.rs | 3 | ||||
| -rw-r--r-- | stanza/Cargo.toml | 1 | ||||
| -rw-r--r-- | stanza/src/client/iq.rs | 9 | ||||
| -rw-r--r-- | stanza/src/client/message.rs | 15 | ||||
| -rw-r--r-- | stanza/src/client/presence.rs | 17 | ||||
| -rw-r--r-- | stanza/src/lib.rs | 2 | ||||
| -rw-r--r-- | stanza/src/xep_0060/event.rs | 21 | ||||
| -rw-r--r-- | stanza/src/xep_0060/pubsub.rs | 41 | ||||
| -rw-r--r-- | stanza/src/xep_0172.rs | 30 | 
23 files changed, 770 insertions, 272 deletions
| @@ -22,8 +22,8 @@  - [ ] server side downgrade protection for sasl  - [x] xep-0199: xmpp ping  - [x] xep-0203: delayed delivery -- [ ] xep-0030: service discovery -- [ ] xep-0115: entity capabilities +- [x] xep-0030: service discovery +- [x] xep-0115: entity capabilities  - [ ] xep-0163: pep  - [ ] xep-0245: /me  - [ ] xep-0084: user avatar diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml index ef11192..e9be687 100644 --- a/filamento/Cargo.toml +++ b/filamento/Cargo.toml @@ -8,7 +8,7 @@ futures = "0.3.31"  lampada = { version = "0.1.0", path = "../lampada" }  tokio = "1.42.0"  thiserror = "2.0.11" -stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030"] } +stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172"] }  sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }  # TODO: re-export jid?  jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index 506d698..74a9aa1 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -21,6 +21,8 @@ async fn main() {      client.connect().await.unwrap();      tokio::time::sleep(Duration::from_secs(5)).await; +    info!("changing nick"); +    client.change_nick("britney".to_string()).await.unwrap();      info!("sending message");      client          .send_message( @@ -32,9 +34,30 @@ async fn main() {          .await          .unwrap();      info!("sent message"); -    info!("sending disco query"); -    let info = client.disco_info(None, None).await.unwrap(); -    info!("got disco result: {:#?}", info); -    let items = client.disco_items(None, None).await.unwrap(); -    info!("got disco result: {:#?}", items); +    tokio::time::sleep(Duration::from_secs(5)).await; +    // info!("sending disco query"); +    // let info = client.disco_info(None, None).await.unwrap(); +    // info!("got disco result: {:#?}", info); +    // let items = client.disco_items(None, None).await.unwrap(); +    // info!("got disco result: {:#?}", items); +    // let info = client +    //     .disco_info(Some("blos.sm".parse().unwrap()), None) +    //     .await +    //     .unwrap(); +    // info!("got disco result: {:#?}", info); +    // let items = client +    //     .disco_items(Some("blos.sm".parse().unwrap()), None) +    //     .await +    //     .unwrap(); +    // info!("got disco result: {:#?}", items); +    // let info = client +    //     .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None) +    //     .await +    //     .unwrap(); +    // info!("got disco result: {:#?}", info); +    // let items = client +    //     .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None) +    //     .await +    //     .unwrap(); +    // info!("got disco result: {:#?}", items);  } diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql index 148598b..3b56664 100644 --- a/filamento/migrations/20240113011930_luz.sql +++ b/filamento/migrations/20240113011930_luz.sql @@ -5,6 +5,7 @@ PRAGMA foreign_keys = on;  create table users(      -- TODO: enforce bare jid      jid text primary key not null, +    nick text,      -- can receive presence status from non-contacts      cached_status_message text      -- TODO: last_seen @@ -67,14 +68,24 @@ create table groups_roster(  -- can send chat message to user (creating a new chat if not already exists)  create table chats (      id text primary key not null, +    have_chatted bool not null,      correspondent text not null unique,      foreign key(correspondent) references users(jid)  ); +-- enum for subscription state +create table delivery( +    state text primary key not null +); + +insert into delivery ( state ) values ('sending'), ('written'), ('sent'), ('delivered'), ('read'), ('failed'), ('queued'); +  -- messages include reference to chat they are in, and who sent them.  create table messages (      id text primary key not null,      body text, +    -- delivery is nullable as only messages sent by the user are markable +    delivery text,      chat_id text not null,      -- TODO: channel stuff       -- channel_id uuid, @@ -94,6 +105,7 @@ create table messages (      from_resource text,      -- check (from_jid != original_sender), +    foreign key(delivery) references delivery(state),      -- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user)      -- TODO: read bool not null,      foreign key(chat_id) references chats(id) on delete cascade, diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs index c1194ea..147c7f7 100644 --- a/filamento/src/chat.rs +++ b/filamento/src/chat.rs @@ -1,5 +1,6 @@  use chrono::{DateTime, Utc};  use jid::JID; +use sqlx::Sqlite;  use uuid::Uuid;  #[derive(Debug, sqlx::FromRow, Clone)] @@ -7,7 +8,9 @@ pub struct Message {      pub id: Uuid,      // does not contain full user information      #[sqlx(rename = "from_jid")] +    // bare jid (for now)      pub from: JID, +    pub delivery: Option<Delivery>,      pub timestamp: DateTime<Utc>,      // TODO: originally_from      // TODO: message edits @@ -16,6 +19,59 @@ pub struct Message {      pub body: Body,  } +#[derive(Debug, Clone, Copy)] +pub enum Delivery { +    Sending, +    Written, +    Sent, +    Delivered, +    Read, +    Failed, +    Queued, +} + +impl sqlx::Type<Sqlite> for Delivery { +    fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo { +        <&str as sqlx::Type<Sqlite>>::type_info() +    } +} + +impl sqlx::Decode<'_, Sqlite> for Delivery { +    fn decode( +        value: <Sqlite as sqlx::Database>::ValueRef<'_>, +    ) -> Result<Self, sqlx::error::BoxDynError> { +        let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?; +        match value { +            "sending" => Ok(Self::Sending), +            "written" => Ok(Self::Written), +            "sent" => Ok(Self::Sent), +            "delivered" => Ok(Self::Delivered), +            "read" => Ok(Self::Read), +            "failed" => Ok(Self::Failed), +            "queued" => Ok(Self::Queued), +            _ => unreachable!(), +        } +    } +} + +impl sqlx::Encode<'_, Sqlite> for Delivery { +    fn encode_by_ref( +        &self, +        buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>, +    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> { +        let value = match self { +            Delivery::Sending => "sending", +            Delivery::Written => "written", +            Delivery::Sent => "sent", +            Delivery::Delivered => "delivered", +            Delivery::Read => "read", +            Delivery::Failed => "failed", +            Delivery::Queued => "queued", +        }; +        <&str as sqlx::Encode<Sqlite>>::encode(value, buf) +    } +} +  // TODO: user migrations  // pub enum Migrated {  //     Jabber(User), @@ -31,6 +87,7 @@ pub struct Body {  #[derive(sqlx::FromRow, Debug, Clone)]  pub struct Chat {      pub correspondent: JID, +    pub have_chatted: bool,      // pub unread_messages: i32,      // pub latest_message: Message,      // when a new message is received, the chat should be updated, and the new message should be delivered too. @@ -41,8 +98,11 @@ pub struct Chat {  pub enum ChatUpdate {}  impl Chat { -    pub fn new(correspondent: JID) -> Self { -        Self { correspondent } +    pub fn new(correspondent: JID, have_chatted: bool) -> Self { +        Self { +            correspondent, +            have_chatted, +        }      }      pub fn correspondent(&self) -> &JID {          &self.correspondent diff --git a/filamento/src/db.rs b/filamento/src/db.rs index 1054ec2..f92bfb2 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -50,8 +50,9 @@ impl Db {      pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {          sqlx::query!( -            "insert into users ( jid, cached_status_message ) values ( ?, ? )", +            "insert into users ( jid, nick, cached_status_message ) values ( ?, ?, ? )",              user.jid, +            user.nick,              user.cached_status_message          )          .execute(&self.db) @@ -60,6 +61,12 @@ impl Db {      }      pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { +        sqlx::query!( +            "insert into users ( jid ) values ( ? ) on conflict do nothing", +            user +        ) +        .execute(&self.db) +        .await?;          let user: User = sqlx::query_as("select * from users where jid = ?")              .bind(user)              .fetch_one(&self.db) @@ -67,10 +74,23 @@ impl Db {          Ok(user)      } +    pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<(), Error> { +        sqlx::query!( +            "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?", +            jid, +            nick, +            nick +        ) +        .execute(&self.db) +        .await?; +        Ok(()) +    } +      pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {          sqlx::query!( -            "update users set cached_status_message = ? where jid = ?", +            "update users set cached_status_message = ?, nick = ? where jid = ?",              user.cached_status_message, +            user.nick,              user.jid          )          .execute(&self.db) @@ -288,6 +308,17 @@ impl Db {          Ok(chat)      } +    pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { +        let jid = chat.as_bare(); +        sqlx::query!( +            "update chats set have_chatted = true where correspondent = ?", +            jid +        ) +        .execute(&self.db) +        .await?; +        Ok(()) +    } +      pub(crate) async fn update_chat_correspondent(          &self,          old_chat: Chat, @@ -387,38 +418,47 @@ impl Db {      }      /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. -    pub(crate) async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> { +    pub(crate) async fn create_message( +        &self, +        message: Message, +        chat: JID, +        from: JID, +    ) -> Result<(), Error> {          // TODO: one query -        let bare_jid = message.from.as_bare(); -        let resource = message.from.resourcepart; +        let from_jid = from.as_bare();          let chat_id = self.read_chat_id(chat).await?; -        sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource, message.timestamp).execute(&self.db).await?; +        sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, from_jid, from.resourcepart, message.timestamp).execute(&self.db).await?;          Ok(())      } -    pub(crate) async fn create_message_with_self_resource_and_chat( -        &self, -        message: Message, -        chat: JID, -    ) -> Result<(), Error> { -        let from_jid = message.from.as_bare(); -        let resource = &message.from.resourcepart; +    pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> {          let bare_chat = chat.as_bare();          sqlx::query!(              "insert into users (jid) values (?) on conflict do nothing", -            from_jid +            bare_chat,          )          .execute(&self.db)          .await?;          let id = Uuid::new_v4(); -        sqlx::query!( -            "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", -            id, -            bare_chat -        ) -        .execute(&self.db) -        .await?; -        if let Some(resource) = resource { +        let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing returning *") +            .bind(id) +            .bind(bare_chat) +            .bind(false) +            .fetch_one(&self.db) +            .await?; +        Ok(chat.have_chatted) +    } + +    /// MUST upsert chat beforehand +    pub(crate) async fn create_message_with_self_resource( +        &self, +        message: Message, +        chat: JID, +        // full jid +        from: JID, +    ) -> Result<(), Error> { +        let from_jid = from.as_bare(); +        if let Some(resource) = &from.resourcepart {              sqlx::query!(                  "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",                  from_jid, @@ -427,15 +467,17 @@ impl Db {              .execute(&self.db)              .await?;          } -        self.create_message(message, chat).await?; +        self.create_message(message, chat, from).await?;          Ok(())      }      // create direct message from incoming -    pub(crate) async fn create_message_with_user_resource_and_chat( +    pub(crate) async fn create_message_with_user_resource(          &self,          message: Message,          chat: JID, +        // full jid +        from: JID,      ) -> Result<(), Error> {          let bare_chat = chat.as_bare();          let resource = &chat.resourcepart; @@ -462,7 +504,7 @@ impl Db {              .execute(&self.db)              .await?;          } -        self.create_message(message, chat).await?; +        self.create_message(message, chat, from).await?;          Ok(())      } diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 8e2e4be..9ecc330 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -34,12 +34,22 @@ pub enum Error {      MessageSend(#[from] MessageSendError),      #[error("message receive error: {0}")]      MessageRecv(#[from] MessageRecvError), +    #[error("subscripbe error: {0}")] +    Subscribe(#[from] SubscribeError), +    #[error("publish error: {0}")] +    Publish(#[from] PublishError),  }  #[derive(Debug, Error, Clone)]  pub enum MessageSendError {      #[error("could not add to message history: {0}")]      MessageHistory(#[from] DatabaseError), +    #[error("could not mark chat as chatted: {0}")] +    MarkChatAsChatted(DatabaseError), +    #[error("could not get client user details: {0}")] +    GetUserDetails(DatabaseError), +    #[error("writing message to connection: {0}")] +    Write(#[from] WriteError),  }  #[derive(Debug, Error, Clone)] @@ -48,6 +58,8 @@ pub enum MessageRecvError {      MessageHistory(#[from] DatabaseError),      #[error("missing from")]      MissingFrom, +    #[error("could not update user nick: {0}")] +    NickUpdate(DatabaseError),  }  #[derive(Debug, Error, Clone)] @@ -75,7 +87,7 @@ pub enum RosterError {      #[error("cache: {0}")]      Cache(#[from] DatabaseError),      #[error("iq response: {0}")] -    IqResponse(#[from] RequestError), +    IqResponse(#[from] IqRequestError),      #[error("stream write: {0}")]      Write(#[from] WriteError),      // TODO: display for stanza, to show as xml, same for read error types. @@ -92,7 +104,7 @@ pub enum DiscoError {      #[error("write error: {0}")]      Write(#[from] WriteError),      #[error("iq response: {0}")] -    IqResponse(#[from] RequestError), +    IqResponse(#[from] IqRequestError),      #[error("reply from incorrect entity: {0}")]      IncorrectEntity(JID),      #[error("unexpected reply: {0:?}")] @@ -108,7 +120,7 @@ pub enum DiscoError {  }  #[derive(Debug, Error, Clone)] -pub enum RequestError { +pub enum IqRequestError {      #[error("sending request: {0}")]      Write(#[from] WriteError),      #[error("receiving expected response: {0}")] @@ -173,6 +185,14 @@ impl From<tokio::io::Error> for DatabaseOpenError {  }  #[derive(Debug, Error, Clone)] +pub enum SubscribeError { +    #[error("write: {0}")] +    Write(#[from] WriteError), +    #[error("fetching client user details: {0}")] +    Database(#[from] DatabaseError), +} + +#[derive(Debug, Error, Clone)]  pub enum PresenceError {      #[error("unsupported")]      Unsupported, @@ -181,3 +201,29 @@ pub enum PresenceError {      #[error("stanza error: {0}")]      StanzaError(#[from] stanza::client::error::Error),  } + +#[derive(Debug, Error, Clone)] +pub enum PublishError { +    #[error("received mismatched query")] +    MismatchedQuery(Query), +    #[error("missing query")] +    MissingQuery, +    #[error("stanza errors: {0:?}")] +    StanzaErrors(Vec<stanza::client::error::Error>), +    #[error("reply from incorrect entity: {0}")] +    IncorrectEntity(JID), +    #[error("unexpected stanza: {0:?}")] +    UnexpectedStanza(Stanza), +    #[error("iq response: {0}")] +    IqResponse(#[from] IqRequestError), +} + +#[derive(Debug, Error, Clone)] +pub enum NickError { +    #[error("publishing nick: {0}")] +    Publish(#[from] CommandError<PublishError>), +    #[error("updating database: {0}")] +    Database(#[from] DatabaseError), +    #[error("disconnected")] +    Disconnected, +} diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index 4d852e2..6118f75 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -6,13 +6,13 @@ use std::{      time::Duration,  }; -use chat::{Body, Chat, Message}; +use chat::{Body, Chat, Delivery, Message};  use chrono::Utc;  use db::Db;  use disco::{Info, Items};  use error::{ -    ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, PresenceError, -    RosterError, StatusError, +    ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, NickError, +    PresenceError, PublishError, RosterError, StatusError, SubscribeError,  };  use futures::FutureExt;  use jid::JID; @@ -40,6 +40,7 @@ pub mod db;  pub mod disco;  pub mod error;  mod logic; +pub mod pep;  pub mod presence;  pub mod roster;  pub mod user; @@ -68,13 +69,13 @@ pub enum Command {      /// add a contact to your roster, with a status of none, no subscriptions.      AddContact(JID, oneshot::Sender<Result<(), RosterError>>),      /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. -    BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    BuddyRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),      /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. -    SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    SubscriptionRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),      /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. -    AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    AcceptBuddyRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),      /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. -    AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),      /// unsubscribe to a contact, but don't remove their subscription.      UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),      /// stop a contact from being subscribed, but stay subscribed to the contact. @@ -98,7 +99,9 @@ pub enum Command {      // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)      /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a      /// chatroom). if disconnected, will be cached so when client connects, message will be sent. -    SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>), +    SendMessage(JID, Body), +    // TODO: resend failed messages +    // ResendMessage(Uuid),      /// disco info query      DiscoInfo(          Option<JID>, @@ -111,6 +114,14 @@ pub enum Command {          Option<String>,          oneshot::Sender<Result<disco::Items, DiscoError>>,      ), +    /// publish item to a pep node, specified or default according to item. +    Publish { +        item: pep::Item, +        node: String, +        sender: oneshot::Sender<Result<(), PublishError>>, +    }, +    /// change user nickname +    ChangeNick(String, oneshot::Sender<Result<(), NickError>>),  }  #[derive(Debug, Clone)] @@ -134,7 +145,15 @@ pub enum UpdateMessage {          to: JID,          message: Message,      }, +    MessageDelivery { +        id: Uuid, +        delivery: Delivery, +    },      SubscriptionRequest(jid::JID), +    NickChanged { +        jid: JID, +        nick: String, +    },  }  /// an xmpp client that is suited for a chat client use case @@ -192,7 +211,7 @@ impl Client {              timeout: Duration::from_secs(10),          }; -        let logic = ClientLogic::new(client.clone(), db, update_send); +        let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send);          let actor: CoreClient<ClientLogic> =              CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); @@ -328,7 +347,7 @@ impl Client {          Ok(result)      } -    pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +    pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))              .await @@ -340,7 +359,7 @@ impl Client {          Ok(result)      } -    pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +    pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(Command::SubscriptionRequest(              jid, send, @@ -354,7 +373,7 @@ impl Client {          Ok(result)      } -    pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +    pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(              jid, send, @@ -371,7 +390,7 @@ impl Client {      pub async fn accept_subscription_request(          &self,          jid: JID, -    ) -> Result<(), CommandError<WriteError>> { +    ) -> Result<(), CommandError<SubscribeError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(              Command::AcceptSubscriptionRequest(jid, send), @@ -471,18 +490,10 @@ impl Client {          Ok(result)      } -    pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> { -        let (send, recv) = oneshot::channel(); -        self.send(CoreClientCommand::Command(Command::SendMessage( -            jid, body, send, -        ))) -        .await -        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; -        let result = timeout(self.timeout, recv) -            .await -            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? -            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; -        Ok(result) +    pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), ActorError> { +        self.send(CoreClientCommand::Command(Command::SendMessage(jid, body))) +            .await?; +        Ok(())      }      pub async fn disco_info( @@ -520,6 +531,38 @@ impl Client {              .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;          Ok(result)      } + +    pub async fn publish( +        &self, +        item: pep::Item, +        node: String, +    ) -> Result<(), CommandError<PublishError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::Publish { +            item, +            node, +            sender: send, +        })) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn change_nick(&self, nick: String) -> Result<(), CommandError<NickError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::ChangeNick(nick, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    }  }  impl From<Command> for CoreClientCommand<Command> { diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 15c2d12..1ddd7d3 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -1,5 +1,6 @@  use std::{collections::HashMap, sync::Arc}; +use jid::JID;  use lampada::{Connected, Logic, error::ReadError};  use stanza::client::Stanza;  use tokio::sync::{Mutex, mpsc, oneshot}; @@ -8,7 +9,7 @@ use tracing::{error, info, warn};  use crate::{      Client, Command, UpdateMessage,      db::Db, -    error::{Error, RequestError, ResponseError}, +    error::{Error, IqRequestError, ResponseError},  };  mod abort; @@ -23,6 +24,7 @@ mod process_stanza;  #[derive(Clone)]  pub struct ClientLogic {      client: Client, +    bare_jid: JID,      db: Db,      pending: Pending,      update_sender: mpsc::Sender<UpdateMessage>, @@ -41,7 +43,7 @@ impl Pending {          connection: &Connected,          request: Stanza,          id: String, -    ) -> Result<Stanza, RequestError> { +    ) -> Result<Stanza, IqRequestError> {          let (send, recv) = oneshot::channel();          {              self.0.lock().await.insert(id, send); @@ -74,12 +76,18 @@ impl Pending {  }  impl ClientLogic { -    pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self { +    pub fn new( +        client: Client, +        bare_jid: JID, +        db: Db, +        update_sender: mpsc::Sender<UpdateMessage>, +    ) -> Self {          Self {              db,              pending: Pending::new(),              update_sender,              client, +            bare_jid,          }      } diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index bc2666a..6399cf7 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -1,8 +1,14 @@ +use chrono::Utc;  use lampada::error::WriteError; +use uuid::Uuid;  use crate::{      Command, -    error::{DatabaseError, DiscoError, Error, RosterError, StatusError}, +    chat::{Delivery, Message}, +    error::{ +        DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError, +        StatusError, +    },      presence::Online,      roster::Contact,  }; @@ -76,16 +82,16 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res              sender.send(Err(RosterError::Write(WriteError::Disconnected)));          }          Command::BuddyRequest(_jid, sender) => { -            sender.send(Err(WriteError::Disconnected)); +            sender.send(Err(WriteError::Disconnected.into()));          }          Command::SubscriptionRequest(_jid, sender) => { -            sender.send(Err(WriteError::Disconnected)); +            sender.send(Err(WriteError::Disconnected.into()));          }          Command::AcceptBuddyRequest(_jid, sender) => { -            sender.send(Err(WriteError::Disconnected)); +            sender.send(Err(WriteError::Disconnected.into()));          }          Command::AcceptSubscriptionRequest(_jid, sender) => { -            sender.send(Err(WriteError::Disconnected)); +            sender.send(Err(WriteError::Disconnected.into()));          }          Command::UnsubscribeFromContact(_jid, sender) => {              sender.send(Err(WriteError::Disconnected)); @@ -107,8 +113,42 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res              sender.send(result);          }          // TODO: offline message queue -        Command::SendMessage(_jid, _body, sender) => { -            sender.send(Err(WriteError::Disconnected)); +        Command::SendMessage(jid, body) => { +            let id = Uuid::new_v4(); +            let timestamp = Utc::now(); + +            let message = Message { +                id, +                from: logic.bare_jid.clone(), +                // TODO: failure reason +                delivery: Some(Delivery::Failed), +                timestamp, +                body, +            }; +            // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send +            // TODO: mark these as potentially failed upon client launch +            if let Err(e) = logic +                .db() +                .create_message_with_self_resource( +                    message.clone(), +                    jid.clone(), +                    // TODO: when message is queued and sent, the from must also be updated with the correct resource +                    logic.bare_jid.clone(), +                ) +                .await +            { +                // TODO: should these really be handle_error or just the error macro? +                logic +                    .handle_error(MessageSendError::MessageHistory(e.into()).into()) +                    .await; +            } +            logic +                .update_sender() +                .send(crate::UpdateMessage::Message { +                    to: jid.as_bare(), +                    message, +                }) +                .await;          }          Command::SendPresence(_jid, _presence, sender) => {              sender.send(Err(WriteError::Disconnected)); @@ -119,6 +159,16 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res          Command::DiscoItems(_jid, _node, sender) => {              sender.send(Err(DiscoError::Write(WriteError::Disconnected)));          } +        Command::Publish { +            item: _, +            node: _, +            sender, +        } => { +            sender.send(Err(IqRequestError::Write(WriteError::Disconnected).into())); +        } +        Command::ChangeNick(_, sender) => { +            sender.send(Err(NickError::Disconnected)); +        }      }      Ok(())  } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index 63a4aa3..d32f527 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -3,10 +3,11 @@ use jid::JID;  use lampada::{Connected, WriteMessage, error::WriteError};  use stanza::{      client::{ -        Stanza, -        iq::{self, Iq, IqType, Query}, +        iq::{self, Iq, IqType, Query}, Stanza      },      xep_0030::{info, items}, +    xep_0060::pubsub::{self, Pubsub}, +    xep_0172::{self, Nick},      xep_0203::Delay,  };  use tokio::sync::oneshot; @@ -14,12 +15,9 @@ use tracing::{debug, error, info};  use uuid::Uuid;  use crate::{ -    Command, UpdateMessage, -    chat::{Body, Message}, -    disco::{Info, Items}, -    error::{DatabaseError, DiscoError, Error, MessageSendError, RosterError, StatusError}, -    presence::{Online, Presence, PresenceType}, -    roster::{Contact, ContactUpdate}, +    chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ +        DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError +    }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage  };  use super::{ @@ -156,105 +154,82 @@ pub async fn handle_add_contact(      }  } -pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> { +pub async fn handle_buddy_request( +    logic: &ClientLogic, +    connection: Connected, +    jid: JID, +) -> Result<(), SubscribeError> { +    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; +    let nick = client_user.nick.map(|nick| Nick(nick));      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid.clone()),          r#type: Some(stanza::client::presence::PresenceType::Subscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        nick, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Subscribed), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(())  }  pub async fn handle_subscription_request( +    logic: &ClientLogic,      connection: Connected,      jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> {      // TODO: i should probably have builders +    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; +    let nick = client_user.nick.map(|nick| Nick(nick));      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Subscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        nick, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(())  }  pub async fn handle_accept_buddy_request( +    logic: &ClientLogic,      connection: Connected,      jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> {      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid.clone()),          r#type: Some(stanza::client::presence::PresenceType::Subscribed), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?; +    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; +    let nick = client_user.nick.map(|nick| Nick(nick));      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Subscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        nick, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(())  }  pub async fn handle_accept_subscription_request( +    logic: &ClientLogic,      connection: Connected,      jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> { +    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; +    let nick = client_user.nick.map(|nick| Nick(nick));      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Subscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        nick, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(()) @@ -265,16 +240,9 @@ pub async fn handle_unsubscribe_from_contact(      jid: JID,  ) -> Result<(), WriteError> {      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(()) @@ -282,16 +250,9 @@ pub async fn handle_unsubscribe_from_contact(  pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(()) @@ -299,29 +260,15 @@ pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Resu  pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid.clone()),          r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(()) @@ -464,60 +411,119 @@ pub async fn handle_set_status(      Ok(())  } -pub async fn handle_send_message( -    logic: &ClientLogic, -    connection: Connected, -    jid: JID, -    body: Body, -) -> Result<(), WriteError> { +pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { +    // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. +    let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + +    let nick; +    let mark_chat_as_chatted; +    if have_chatted == false { +        match logic.db.read_user(logic.bare_jid.clone()).await { +            Ok(u) => { +                nick = u.nick.map(|nick| Nick(nick)); +                mark_chat_as_chatted = true; +            } +            Err(e) => { +                logic +                    .handle_error(MessageSendError::GetUserDetails(e.into()).into()) +                    .await; +                nick = None; +                mark_chat_as_chatted = false; +            } +        } +    } else { +        nick = None; +        mark_chat_as_chatted = false; +    } + +    // generate message struct      let id = Uuid::new_v4();      let timestamp = Utc::now(); -    let message = Stanza::Message(stanza::client::message::Message { +    let message = Message { +        id, +        from: connection.jid().as_bare(), +        body: body.clone(), +        timestamp, +        delivery: Some(Delivery::Sending), +    }; + +    // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send +    // TODO: mark these as potentially failed upon client launch +    if let Err(e) = logic +        .db() +        .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone()) +        .await +    { +        // TODO: should these really be handle_error or just the error macro? +        logic +            .handle_error(MessageSendError::MessageHistory(e.into()).into()) +            .await; +    } + +    // tell the client a message is being sent +    logic +        .update_sender() +        .send(UpdateMessage::Message { +            to: jid.as_bare(), +            message, +        }) +        .await; + +    // prepare the message stanza +    let message_stanza = Stanza::Message(stanza::client::message::Message {          from: Some(connection.jid().clone()),          id: Some(id.to_string()),          to: Some(jid.clone()),          // TODO: specify message type          r#type: stanza::client::message::MessageType::Chat,          // TODO: lang ? -        lang: None, -        subject: None,          body: Some(stanza::client::message::Body {              lang: None,              body: Some(body.body.clone()),          }), -        thread: None,          // include delay to have a consistent timestamp between server and client          delay: Some(Delay {              from: None,              stamp: timestamp,          }), +        nick, +        ..Default::default()      }); -    connection.write_handle().write(message).await?; -    let mut message = Message { -        id, -        from: connection.jid().clone(), -        body, -        timestamp, -    }; -    info!("sent message: {:?}", message); -    if let Err(e) = logic -        .db() -        .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) -        .await -    { -        // TODO: should these really be handle_error or just the error macro? -        logic -            .handle_error(MessageSendError::MessageHistory(e.into()).into()) -            .await; -    } -    // TODO: don't do this, have separate from from details -    message.from = message.from.as_bare(); -    let _ = logic -        .update_sender() -        .send(UpdateMessage::Message { to: jid, message }) + +    // send the message +    let result = connection +        .write_handle() +        .write(message_stanza.clone())          .await; -    Ok(()) -    // TODO: refactor this to send a sending updatemessage, then update or something like that +    match result { +        Ok(_) => { +            info!("sent message: {:?}", message_stanza); +            logic +                .update_sender() +                .send(UpdateMessage::MessageDelivery { +                    id, +                    delivery: Delivery::Written, +                }) +                .await; +            if mark_chat_as_chatted { +                if let Err(e) = logic.db.mark_chat_as_chatted(jid).await { +                    logic +                        .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) +                        .await; +                } +            } +        } +        Err(e) => { +            logic +                .update_sender() +                .send(UpdateMessage::MessageDelivery { +                    id, +                    delivery: Delivery::Failed, +                }) +                .await; +            logic.handle_error(MessageSendError::Write(e).into()).await; +        } +    }  }  pub async fn handle_send_presence( @@ -673,6 +679,78 @@ pub async fn handle_disco_items(      }  } +pub async fn handle_publish( +    logic: &ClientLogic, +    connection: Connected, +    item: pep::Item, +    node: String, +) -> Result<(), PublishError> { +    let id = Uuid::new_v4().to_string(); +    let publish = match item { +        pep::Item::Nick(n) => pubsub::Publish { +            node, +            items: vec![pubsub::Item { +                item: Some(pubsub::Content::Nick(Nick(n))), +                ..Default::default() +            }], +        }, +    }; +    let request = Iq { +        from: Some(connection.jid().clone()), +        id: id.clone(), +        to: None, +        r#type: IqType::Set, +        lang: None, +        query: Some(Query::Pubsub(Pubsub::Publish(publish, None))), +        errors: Vec::new(), +    }; +    match logic +        .pending() +        .request(&connection, Stanza::Iq(request), id) +        .await? { +         +        Stanza::Iq(Iq { +            from, +            r#type, +            query, +            errors, +            .. +            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. +        }) if r#type == IqType::Result || r#type == IqType::Error => { +            if from == None ||  +                    from == Some(connection.jid().as_bare()) +             { +                match r#type { +                    IqType::Result => { +                        if let Some(query) = query { +                            match query { +                                Query::Pubsub(_) => Ok(()), +                                q => Err(PublishError::MismatchedQuery(q)), +                            } +                        } else { +                            Err(PublishError::MissingQuery) +                        } +                    } +                    IqType::Error => { +                        Err(PublishError::StanzaErrors(errors)) +                    } +                    _ => unreachable!(), +                } +            } else { +                Err(PublishError::IncorrectEntity( +                    from.unwrap_or_else(|| connection.jid().as_bare()), +                )) +            } +        } +        s => Err(PublishError::UnexpectedStanza(s)), +    } +} + +pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { +    logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?; +    Ok(()) +} +  // TODO: could probably macro-ise?  pub async fn handle_online_result(      logic: &ClientLogic, @@ -716,25 +794,24 @@ pub async fn handle_online_result(              let user = handle_get_user(logic, jid).await;              let _ = sender.send(user);          } -        // TODO: offline queue to modify roster          Command::AddContact(jid, sender) => {              let result = handle_add_contact(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::BuddyRequest(jid, sender) => { -            let result = handle_buddy_request(connection, jid).await; +            let result = handle_buddy_request(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::SubscriptionRequest(jid, sender) => { -            let result = handle_subscription_request(connection, jid).await; +            let result = handle_subscription_request(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::AcceptBuddyRequest(jid, sender) => { -            let result = handle_accept_buddy_request(connection, jid).await; +            let result = handle_accept_buddy_request(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::AcceptSubscriptionRequest(jid, sender) => { -            let result = handle_accept_subscription_request(connection, jid).await; +            let result = handle_accept_subscription_request(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::UnsubscribeFromContact(jid, sender) => { @@ -761,10 +838,8 @@ pub async fn handle_online_result(              let result = handle_set_status(logic, connection, online).await;              let _ = sender.send(result);          } -        // TODO: offline message queue -        Command::SendMessage(jid, body, sender) => { -            let result = handle_send_message(logic, connection, jid, body).await; -            let _ = sender.send(result); +        Command::SendMessage(jid, body) => { +            handle_send_message(logic, connection, jid, body).await;          }          Command::SendPresence(jid, presence, sender) => {              let result = handle_send_presence(connection, jid, presence).await; @@ -778,6 +853,14 @@ pub async fn handle_online_result(              let result = handle_disco_items(logic, connection, jid, node).await;              let _ = sender.send(result);          } +        Command::Publish { item, node, sender } => { +            let result = handle_publish(logic, connection, item, node).await; +            let _ = sender.send(result); +        } +        Command::ChangeNick(nick, sender) => { +            let result = handle_change_nick(logic, nick).await; +            let _ = sender.send(result); +        }      }      Ok(())  } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index b1bc830..2f6644e 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -41,38 +41,74 @@ pub async fn recv_message(      logic: ClientLogic,      stanza_message: stanza::client::message::Message,  ) -> Result<Option<UpdateMessage>, MessageRecvError> { -    if let Some(mut from) = stanza_message.from { +    if let Some(from) = stanza_message.from {          // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.          let timestamp = stanza_message              .delay              .map(|delay| delay.stamp)              .unwrap_or_else(|| Utc::now());          // TODO: group chat messages -        let mut message = Message { -            id: stanza_message -                .id -                // TODO: proper id storage -                .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) -                .unwrap_or_else(|| Uuid::new_v4()), -            from: from.clone(), -            timestamp, -            body: Body { -                // TODO: should this be an option? -                body: stanza_message -                    .body -                    .map(|body| body.body) -                    .unwrap_or_default() -                    .unwrap_or_default(), -            }, -        }; + +        // if there is a body, should create chat message +        if let Some(body) = stanza_message.body { +            let message = Message { +                id: stanza_message +                    .id +                    // TODO: proper id xep +                    .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) +                    .unwrap_or_else(|| Uuid::new_v4()), +                from: from.as_bare(), +                timestamp, +                body: Body { +                    body: body.body.unwrap_or_default(), +                }, +                delivery: None, +            }; + +            // save the message to the database +            logic.db().upsert_chat_and_user(&from).await?; +            if let Err(e) = logic +                .db() +                .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) +                .await +            { +                logic +                    .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) +                    .await; +            } + +            // update the client with the new message +            logic +                .update_sender() +                .send(UpdateMessage::Message { +                    to: from.as_bare(), +                    message, +                }) +                .await; +        } + +        if let Some(nick) = stanza_message.nick { +            if let Err(e) = logic +                .db() +                .upsert_user_nick(from.as_bare(), nick.0.clone()) +                .await +            { +                logic +                    .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) +                    .await; +            } + +            logic +                .update_sender() +                .send(UpdateMessage::NickChanged { +                    jid: from.as_bare(), +                    nick: nick.0, +                }) +                .await; +        } + +        Ok(None)          // TODO: can this be more efficient? -        logic -            .db() -            .create_message_with_user_resource_and_chat(message.clone(), from.clone()) -            .await?; -        message.from = message.from.as_bare(); -        from = from.as_bare(); -        Ok(Some(UpdateMessage::Message { to: from, message }))      } else {          Err(MessageRecvError::MissingFrom)      } diff --git a/filamento/src/pep.rs b/filamento/src/pep.rs new file mode 100644 index 0000000..c71d843 --- /dev/null +++ b/filamento/src/pep.rs @@ -0,0 +1,17 @@ +// in commandmessage +// pub struct Publish { +//     item: Item, +//     node: Option<String>, +//     // no need for node, as item has the node +// } +// +// in updatemessage +// pub struct Event { +//     from: JID, +//     item: Item, +// } + +#[derive(Clone, Debug)] +pub enum Item { +    Nick(String), +} diff --git a/filamento/src/presence.rs b/filamento/src/presence.rs index e35761c..bae8793 100644 --- a/filamento/src/presence.rs +++ b/filamento/src/presence.rs @@ -78,11 +78,6 @@ impl Online {          timestamp: Option<DateTime<Utc>>,      ) -> stanza::client::presence::Presence {          stanza::client::presence::Presence { -            from: None, -            id: None, -            to: None, -            r#type: None, -            lang: None,              show: self.show.map(|show| match show {                  Show::Away => stanza::client::presence::Show::Away,                  Show::Chat => stanza::client::presence::Show::Chat, @@ -97,11 +92,11 @@ impl Online {              priority: self                  .priority                  .map(|priority| stanza::client::presence::Priority(priority)), -            errors: Vec::new(),              delay: timestamp.map(|timestamp| Delay {                  from: None,                  stamp: timestamp,              }), +            ..Default::default()          }      }  } @@ -112,22 +107,16 @@ impl Offline {          timestamp: Option<DateTime<Utc>>,      ) -> stanza::client::presence::Presence {          stanza::client::presence::Presence { -            from: None, -            id: None, -            to: None,              r#type: Some(stanza::client::presence::PresenceType::Unavailable), -            lang: None, -            show: None,              status: self.status.map(|status| stanza::client::presence::Status {                  lang: None,                  status: String1024(status),              }), -            priority: None, -            errors: Vec::new(),              delay: timestamp.map(|timestamp| Delay {                  from: None,                  stamp: timestamp,              }), +            ..Default::default()          }      }  } diff --git a/filamento/src/user.rs b/filamento/src/user.rs index 9914d14..85471d5 100644 --- a/filamento/src/user.rs +++ b/filamento/src/user.rs @@ -1,7 +1,8 @@  use jid::JID; -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, sqlx::FromRow, Clone)]  pub struct User {      pub jid: JID, +    pub nick: Option<String>,      pub cached_status_message: Option<String>,  } diff --git a/stanza/Cargo.toml b/stanza/Cargo.toml index 64962b6..3c3c3e0 100644 --- a/stanza/Cargo.toml +++ b/stanza/Cargo.toml @@ -16,5 +16,6 @@ xep_0030 = []  xep_0059 = []  xep_0060 = ["xep_0004", "dep:chrono"]  xep_0131 = [] +xep_0172 = []  xep_0199 = []  xep_0203 = ["dep:chrono"] diff --git a/stanza/src/client/iq.rs b/stanza/src/client/iq.rs index 6d0c671..50884aa 100644 --- a/stanza/src/client/iq.rs +++ b/stanza/src/client/iq.rs @@ -17,6 +17,9 @@ use crate::roster;  #[cfg(feature = "xep_0030")]  use crate::xep_0030::{self, info, items}; +#[cfg(feature = "xep_0060")] +use crate::xep_0060::pubsub::{self, Pubsub}; +  #[cfg(feature = "xep_0199")]  use crate::xep_0199::{self, Ping}; @@ -42,6 +45,8 @@ pub enum Query {      DiscoInfo(info::Query),      #[cfg(feature = "xep_0030")]      DiscoItems(items::Query), +    #[cfg(feature = "xep_0060")] +    Pubsub(Pubsub),      #[cfg(feature = "xep_0199")]      Ping(Ping),      #[cfg(feature = "rfc_6121")] @@ -67,6 +72,8 @@ impl FromElement for Query {              (Some(xep_0030::items::XMLNS), "query") => {                  Ok(Query::DiscoItems(items::Query::from_element(element)?))              } +            #[cfg(feature = "xep_0060")] +            (Some(pubsub::XMLNS), "pubsub") => Ok(Query::Pubsub(Pubsub::from_element(element)?)),              _ => Ok(Query::Unsupported),          }      } @@ -86,6 +93,8 @@ impl IntoElement for Query {              Query::DiscoInfo(query) => query.builder(),              #[cfg(feature = "xep_0030")]              Query::DiscoItems(query) => query.builder(), +            #[cfg(feature = "xep_0060")] +            Query::Pubsub(pubsub) => pubsub.builder(),          }      }  } diff --git a/stanza/src/client/message.rs b/stanza/src/client/message.rs index e521613..d94b82e 100644 --- a/stanza/src/client/message.rs +++ b/stanza/src/client/message.rs @@ -8,12 +8,14 @@ use peanuts::{  #[cfg(feature = "xep_0131")]  use crate::xep_0131::Headers; +#[cfg(feature = "xep_0172")] +use crate::xep_0172::Nick;  #[cfg(feature = "xep_0203")]  use crate::xep_0203::Delay;  use super::XMLNS; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)]  pub struct Message {      pub from: Option<JID>,      pub id: Option<String>, @@ -29,6 +31,8 @@ pub struct Message {      pub delay: Option<Delay>,      #[cfg(feature = "xep_0131")]      pub headers: Option<Headers>, +    #[cfg(feature = "xep_0172")] +    pub nick: Option<Nick>,  }  impl FromElement for Message { @@ -52,6 +56,9 @@ impl FromElement for Message {          #[cfg(feature = "xep_0131")]          let headers = element.child_opt()?; +        #[cfg(feature = "xep_0172")] +        let nick = element.child_opt()?; +          Ok(Message {              from,              id, @@ -65,6 +72,8 @@ impl FromElement for Message {              delay,              #[cfg(feature = "xep_0131")]              headers, +            #[cfg(feature = "xep_0172")] +            nick,          })      }  } @@ -93,6 +102,9 @@ impl IntoElement for Message {          #[cfg(feature = "xep_0131")]          let builder = builder.push_child_opt(self.headers.clone()); +        #[cfg(feature = "xep_0172")] +        let builder = builder.push_child_opt(self.nick.clone()); +          builder      }  } @@ -137,6 +149,7 @@ impl ToString for MessageType {  #[derive(Clone, Debug)]  pub struct Body {      pub lang: Option<String>, +    // TODO: string stuff      pub body: Option<String>,  } diff --git a/stanza/src/client/presence.rs b/stanza/src/client/presence.rs index 8fb96be..bffb0d0 100644 --- a/stanza/src/client/presence.rs +++ b/stanza/src/client/presence.rs @@ -8,12 +8,14 @@ use peanuts::{  #[cfg(feature = "xep_0131")]  use crate::xep_0131::Headers; +#[cfg(feature = "xep_0172")] +use crate::xep_0172::Nick;  #[cfg(feature = "xep_0203")]  use crate::xep_0203::Delay;  use super::{error::Error, XMLNS}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)]  pub struct Presence {      pub from: Option<JID>,      pub id: Option<String>, @@ -28,8 +30,9 @@ pub struct Presence {      pub delay: Option<Delay>,      #[cfg(feature = "xep_0131")]      pub headers: Option<Headers>, -    // TODO: ##other -    // other: Vec<Other>, +    #[cfg(feature = "xep_0172")] +    pub nick: Option<Nick>, +    // ##other      pub errors: Vec<Error>,  } @@ -55,6 +58,9 @@ impl FromElement for Presence {          #[cfg(feature = "xep_0131")]          let headers = element.child_opt()?; +        #[cfg(feature = "xep_0172")] +        let nick = element.child_opt()?; +          Ok(Presence {              from,              id, @@ -69,6 +75,8 @@ impl FromElement for Presence {              delay,              #[cfg(feature = "xep_0131")]              headers, +            #[cfg(feature = "xep_0172")] +            nick,          })      }  } @@ -92,6 +100,9 @@ impl IntoElement for Presence {          #[cfg(feature = "xep_0131")]          let builder = builder.push_child_opt(self.headers.clone()); +        #[cfg(feature = "xep_0172")] +        let builder = builder.push_child_opt(self.nick.clone()); +          builder      }  } diff --git a/stanza/src/lib.rs b/stanza/src/lib.rs index 0a71a26..5474aee 100644 --- a/stanza/src/lib.rs +++ b/stanza/src/lib.rs @@ -19,6 +19,8 @@ pub mod xep_0059;  pub mod xep_0060;  #[cfg(feature = "xep_0131")]  pub mod xep_0131; +#[cfg(feature = "xep_0172")] +pub mod xep_0172;  #[cfg(feature = "xep_0199")]  pub mod xep_0199;  #[cfg(feature = "xep_0203")] diff --git a/stanza/src/xep_0060/event.rs b/stanza/src/xep_0060/event.rs index bdd8b53..d2c150a 100644 --- a/stanza/src/xep_0060/event.rs +++ b/stanza/src/xep_0060/event.rs @@ -8,6 +8,8 @@ use peanuts::{  };  use crate::xep_0004::X; +#[cfg(feature = "xep_0172")] +use crate::xep_0172::{self, Nick};  pub const XMLNS: &str = "http://jabber.org/protocol/pubsub#event"; @@ -292,19 +294,28 @@ impl IntoElement for Item {  #[derive(Clone, Debug)]  pub enum Content { -    Unknown, +    #[cfg(feature = "xep_0172")] +    Nick(Nick), +    Unknown(Element),  }  impl FromElement for Content { -    fn from_element(_element: Element) -> peanuts::element::DeserializeResult<Self> { -        // TODO: types -        return Ok(Self::Unknown); +    fn from_element(element: Element) -> peanuts::element::DeserializeResult<Self> { +        match element.identify() { +            #[cfg(feature = "xep_0172")] +            (Some(xep_0172::XMLNS), "nick") => Ok(Content::Nick(Nick::from_element(element)?)), +            _ => Ok(Self::Unknown(element)), +        }      }  }  impl IntoElement for Content {      fn builder(&self) -> peanuts::element::ElementBuilder { -        panic!("unknown content cannot be serialized") +        match self { +            #[cfg(feature = "xep_0172")] +            Content::Nick(nick) => nick.builder(), +            Content::Unknown(_e) => panic!("unknown content cannot be serialized"), +        }      }  } diff --git a/stanza/src/xep_0060/pubsub.rs b/stanza/src/xep_0060/pubsub.rs index 3a15a59..25fc405 100644 --- a/stanza/src/xep_0060/pubsub.rs +++ b/stanza/src/xep_0060/pubsub.rs @@ -7,6 +7,8 @@ use peanuts::{  };  use crate::xep_0004::X; +#[cfg(feature = "xep_0172")] +use crate::xep_0172::{self, Nick};  pub const XMLNS: &str = "http://jabber.org/protocol/pubsub"; @@ -301,10 +303,10 @@ impl ToString for DefaultType {  #[derive(Clone, Debug)]  pub struct Items { -    max_items: Option<usize>, -    node: String, -    subid: Option<String>, -    items: Vec<Item>, +    pub max_items: Option<usize>, +    pub node: String, +    pub subid: Option<String>, +    pub items: Vec<Item>,  }  impl FromElement for Items { @@ -337,11 +339,11 @@ impl IntoElement for Items {      }  } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)]  pub struct Item { -    id: Option<String>, -    publisher: Option<String>, -    item: Option<Content>, +    pub id: Option<String>, +    pub publisher: Option<String>, +    pub item: Option<Content>,  }  impl FromElement for Item { @@ -373,19 +375,28 @@ impl IntoElement for Item {  #[derive(Clone, Debug)]  pub enum Content { -    Unknown, +    #[cfg(feature = "xep_0172")] +    Nick(Nick), +    Unknown(Element),  }  impl FromElement for Content { -    fn from_element(_element: Element) -> peanuts::element::DeserializeResult<Self> { -        // TODO: types -        return Ok(Self::Unknown); +    fn from_element(element: Element) -> peanuts::element::DeserializeResult<Self> { +        match element.identify() { +            #[cfg(feature = "xep_0172")] +            (Some(xep_0172::XMLNS), "nick") => Ok(Content::Nick(Nick::from_element(element)?)), +            _ => Ok(Self::Unknown(element)), +        }      }  }  impl IntoElement for Content {      fn builder(&self) -> peanuts::element::ElementBuilder { -        panic!("unknown content cannot be serialized") +        match self { +            #[cfg(feature = "xep_0172")] +            Content::Nick(nick) => nick.builder(), +            Content::Unknown(_e) => panic!("unknown content cannot be serialized"), +        }      }  } @@ -429,8 +440,8 @@ impl IntoElement for Options {  #[derive(Clone, Debug)]  pub struct Publish { -    node: String, -    items: Vec<Item>, +    pub node: String, +    pub items: Vec<Item>,  }  impl FromElement for Publish { diff --git a/stanza/src/xep_0172.rs b/stanza/src/xep_0172.rs new file mode 100644 index 0000000..1c24200 --- /dev/null +++ b/stanza/src/xep_0172.rs @@ -0,0 +1,30 @@ +use peanuts::{ +    element::{FromElement, IntoElement}, +    Element, +}; + +pub const XMLNS: &str = "http://jabber.org/protocol/nick"; + +#[derive(Debug, Clone)] +pub struct Nick(pub String); + +impl FromElement for Nick { +    fn from_element(mut element: peanuts::Element) -> peanuts::element::DeserializeResult<Self> { +        element.check_name("nick")?; +        element.check_namespace(XMLNS)?; + +        Ok(Self(element.pop_value_opt()?.unwrap_or_default())) +    } +} + +impl IntoElement for Nick { +    fn builder(&self) -> peanuts::element::ElementBuilder { +        let builder = Element::builder("nick", Some(XMLNS)); + +        if self.0.is_empty() { +            builder +        } else { +            builder.push_text(self.0.clone()) +        } +    } +} | 
