diff options
| author | 2025-04-03 03:41:38 +0100 | |
|---|---|---|
| committer | 2025-04-03 03:41:38 +0100 | |
| commit | 91f1994af940085d5d475a97820900ebbf0eb553 (patch) | |
| tree | 6aab872f71d17a785d3d9286742fef38983d274c /filamento | |
| parent | 9ce3827a7d25714d17f266f0f50bb29f41090175 (diff) | |
| download | luz-91f1994af940085d5d475a97820900ebbf0eb553.tar.gz luz-91f1994af940085d5d475a97820900ebbf0eb553.tar.bz2 luz-91f1994af940085d5d475a97820900ebbf0eb553.zip | |
feat: better message handling, pep publish, xep_0172: nick
Diffstat (limited to 'filamento')
| -rw-r--r-- | filamento/Cargo.toml | 2 | ||||
| -rw-r--r-- | filamento/examples/example.rs | 33 | ||||
| -rw-r--r-- | filamento/migrations/20240113011930_luz.sql | 12 | ||||
| -rw-r--r-- | filamento/src/chat.rs | 64 | ||||
| -rw-r--r-- | filamento/src/db.rs | 92 | ||||
| -rw-r--r-- | filamento/src/error.rs | 52 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 93 | ||||
| -rw-r--r-- | filamento/src/logic/mod.rs | 14 | ||||
| -rw-r--r-- | filamento/src/logic/offline.rs | 64 | ||||
| -rw-r--r-- | filamento/src/logic/online.rs | 355 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 86 | ||||
| -rw-r--r-- | filamento/src/pep.rs | 17 | ||||
| -rw-r--r-- | filamento/src/presence.rs | 15 | ||||
| -rw-r--r-- | filamento/src/user.rs | 3 | 
14 files changed, 656 insertions, 246 deletions
| diff --git a/filamento/Cargo.toml b/filamento/Cargo.toml index ef11192..e9be687 100644 --- a/filamento/Cargo.toml +++ b/filamento/Cargo.toml @@ -8,7 +8,7 @@ futures = "0.3.31"  lampada = { version = "0.1.0", path = "../lampada" }  tokio = "1.42.0"  thiserror = "2.0.11" -stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030"] } +stanza = { version = "0.1.0", path = "../stanza", features = ["rfc_6121", "xep_0203", "xep_0030", "xep_0060", "xep_0172"] }  sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }  # TODO: re-export jid?  jid = { version = "0.1.0", path = "../jid", features = ["sqlx"] } diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index 506d698..74a9aa1 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -21,6 +21,8 @@ async fn main() {      client.connect().await.unwrap();      tokio::time::sleep(Duration::from_secs(5)).await; +    info!("changing nick"); +    client.change_nick("britney".to_string()).await.unwrap();      info!("sending message");      client          .send_message( @@ -32,9 +34,30 @@ async fn main() {          .await          .unwrap();      info!("sent message"); -    info!("sending disco query"); -    let info = client.disco_info(None, None).await.unwrap(); -    info!("got disco result: {:#?}", info); -    let items = client.disco_items(None, None).await.unwrap(); -    info!("got disco result: {:#?}", items); +    tokio::time::sleep(Duration::from_secs(5)).await; +    // info!("sending disco query"); +    // let info = client.disco_info(None, None).await.unwrap(); +    // info!("got disco result: {:#?}", info); +    // let items = client.disco_items(None, None).await.unwrap(); +    // info!("got disco result: {:#?}", items); +    // let info = client +    //     .disco_info(Some("blos.sm".parse().unwrap()), None) +    //     .await +    //     .unwrap(); +    // info!("got disco result: {:#?}", info); +    // let items = client +    //     .disco_items(Some("blos.sm".parse().unwrap()), None) +    //     .await +    //     .unwrap(); +    // info!("got disco result: {:#?}", items); +    // let info = client +    //     .disco_info(Some("pubsub.blos.sm".parse().unwrap()), None) +    //     .await +    //     .unwrap(); +    // info!("got disco result: {:#?}", info); +    // let items = client +    //     .disco_items(Some("pubsub.blos.sm".parse().unwrap()), None) +    //     .await +    //     .unwrap(); +    // info!("got disco result: {:#?}", items);  } diff --git a/filamento/migrations/20240113011930_luz.sql b/filamento/migrations/20240113011930_luz.sql index 148598b..3b56664 100644 --- a/filamento/migrations/20240113011930_luz.sql +++ b/filamento/migrations/20240113011930_luz.sql @@ -5,6 +5,7 @@ PRAGMA foreign_keys = on;  create table users(      -- TODO: enforce bare jid      jid text primary key not null, +    nick text,      -- can receive presence status from non-contacts      cached_status_message text      -- TODO: last_seen @@ -67,14 +68,24 @@ create table groups_roster(  -- can send chat message to user (creating a new chat if not already exists)  create table chats (      id text primary key not null, +    have_chatted bool not null,      correspondent text not null unique,      foreign key(correspondent) references users(jid)  ); +-- enum for subscription state +create table delivery( +    state text primary key not null +); + +insert into delivery ( state ) values ('sending'), ('written'), ('sent'), ('delivered'), ('read'), ('failed'), ('queued'); +  -- messages include reference to chat they are in, and who sent them.  create table messages (      id text primary key not null,      body text, +    -- delivery is nullable as only messages sent by the user are markable +    delivery text,      chat_id text not null,      -- TODO: channel stuff       -- channel_id uuid, @@ -94,6 +105,7 @@ create table messages (      from_resource text,      -- check (from_jid != original_sender), +    foreign key(delivery) references delivery(state),      -- TODO: from can be either a jid, a moved jid (for when a contact moves, save original sender jid/user but link to new user), or imported (from another service (save details), linked to new user)      -- TODO: read bool not null,      foreign key(chat_id) references chats(id) on delete cascade, diff --git a/filamento/src/chat.rs b/filamento/src/chat.rs index c1194ea..147c7f7 100644 --- a/filamento/src/chat.rs +++ b/filamento/src/chat.rs @@ -1,5 +1,6 @@  use chrono::{DateTime, Utc};  use jid::JID; +use sqlx::Sqlite;  use uuid::Uuid;  #[derive(Debug, sqlx::FromRow, Clone)] @@ -7,7 +8,9 @@ pub struct Message {      pub id: Uuid,      // does not contain full user information      #[sqlx(rename = "from_jid")] +    // bare jid (for now)      pub from: JID, +    pub delivery: Option<Delivery>,      pub timestamp: DateTime<Utc>,      // TODO: originally_from      // TODO: message edits @@ -16,6 +19,59 @@ pub struct Message {      pub body: Body,  } +#[derive(Debug, Clone, Copy)] +pub enum Delivery { +    Sending, +    Written, +    Sent, +    Delivered, +    Read, +    Failed, +    Queued, +} + +impl sqlx::Type<Sqlite> for Delivery { +    fn type_info() -> <Sqlite as sqlx::Database>::TypeInfo { +        <&str as sqlx::Type<Sqlite>>::type_info() +    } +} + +impl sqlx::Decode<'_, Sqlite> for Delivery { +    fn decode( +        value: <Sqlite as sqlx::Database>::ValueRef<'_>, +    ) -> Result<Self, sqlx::error::BoxDynError> { +        let value = <&str as sqlx::Decode<Sqlite>>::decode(value)?; +        match value { +            "sending" => Ok(Self::Sending), +            "written" => Ok(Self::Written), +            "sent" => Ok(Self::Sent), +            "delivered" => Ok(Self::Delivered), +            "read" => Ok(Self::Read), +            "failed" => Ok(Self::Failed), +            "queued" => Ok(Self::Queued), +            _ => unreachable!(), +        } +    } +} + +impl sqlx::Encode<'_, Sqlite> for Delivery { +    fn encode_by_ref( +        &self, +        buf: &mut <Sqlite as sqlx::Database>::ArgumentBuffer<'_>, +    ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> { +        let value = match self { +            Delivery::Sending => "sending", +            Delivery::Written => "written", +            Delivery::Sent => "sent", +            Delivery::Delivered => "delivered", +            Delivery::Read => "read", +            Delivery::Failed => "failed", +            Delivery::Queued => "queued", +        }; +        <&str as sqlx::Encode<Sqlite>>::encode(value, buf) +    } +} +  // TODO: user migrations  // pub enum Migrated {  //     Jabber(User), @@ -31,6 +87,7 @@ pub struct Body {  #[derive(sqlx::FromRow, Debug, Clone)]  pub struct Chat {      pub correspondent: JID, +    pub have_chatted: bool,      // pub unread_messages: i32,      // pub latest_message: Message,      // when a new message is received, the chat should be updated, and the new message should be delivered too. @@ -41,8 +98,11 @@ pub struct Chat {  pub enum ChatUpdate {}  impl Chat { -    pub fn new(correspondent: JID) -> Self { -        Self { correspondent } +    pub fn new(correspondent: JID, have_chatted: bool) -> Self { +        Self { +            correspondent, +            have_chatted, +        }      }      pub fn correspondent(&self) -> &JID {          &self.correspondent diff --git a/filamento/src/db.rs b/filamento/src/db.rs index 1054ec2..f92bfb2 100644 --- a/filamento/src/db.rs +++ b/filamento/src/db.rs @@ -50,8 +50,9 @@ impl Db {      pub(crate) async fn create_user(&self, user: User) -> Result<(), Error> {          sqlx::query!( -            "insert into users ( jid, cached_status_message ) values ( ?, ? )", +            "insert into users ( jid, nick, cached_status_message ) values ( ?, ?, ? )",              user.jid, +            user.nick,              user.cached_status_message          )          .execute(&self.db) @@ -60,6 +61,12 @@ impl Db {      }      pub(crate) async fn read_user(&self, user: JID) -> Result<User, Error> { +        sqlx::query!( +            "insert into users ( jid ) values ( ? ) on conflict do nothing", +            user +        ) +        .execute(&self.db) +        .await?;          let user: User = sqlx::query_as("select * from users where jid = ?")              .bind(user)              .fetch_one(&self.db) @@ -67,10 +74,23 @@ impl Db {          Ok(user)      } +    pub(crate) async fn upsert_user_nick(&self, jid: JID, nick: String) -> Result<(), Error> { +        sqlx::query!( +            "insert into users (jid, nick) values (?, ?) on conflict do update set nick = ?", +            jid, +            nick, +            nick +        ) +        .execute(&self.db) +        .await?; +        Ok(()) +    } +      pub(crate) async fn update_user(&self, user: User) -> Result<(), Error> {          sqlx::query!( -            "update users set cached_status_message = ? where jid = ?", +            "update users set cached_status_message = ?, nick = ? where jid = ?",              user.cached_status_message, +            user.nick,              user.jid          )          .execute(&self.db) @@ -288,6 +308,17 @@ impl Db {          Ok(chat)      } +    pub(crate) async fn mark_chat_as_chatted(&self, chat: JID) -> Result<(), Error> { +        let jid = chat.as_bare(); +        sqlx::query!( +            "update chats set have_chatted = true where correspondent = ?", +            jid +        ) +        .execute(&self.db) +        .await?; +        Ok(()) +    } +      pub(crate) async fn update_chat_correspondent(          &self,          old_chat: Chat, @@ -387,38 +418,47 @@ impl Db {      }      /// if the chat doesn't already exist, it must be created by calling create_chat() before running this function. -    pub(crate) async fn create_message(&self, message: Message, chat: JID) -> Result<(), Error> { +    pub(crate) async fn create_message( +        &self, +        message: Message, +        chat: JID, +        from: JID, +    ) -> Result<(), Error> {          // TODO: one query -        let bare_jid = message.from.as_bare(); -        let resource = message.from.resourcepart; +        let from_jid = from.as_bare();          let chat_id = self.read_chat_id(chat).await?; -        sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, bare_jid, resource, message.timestamp).execute(&self.db).await?; +        sqlx::query!("insert into messages (id, body, chat_id, from_jid, from_resource, timestamp) values (?, ?, ?, ?, ?, ?)", message.id, message.body.body, chat_id, from_jid, from.resourcepart, message.timestamp).execute(&self.db).await?;          Ok(())      } -    pub(crate) async fn create_message_with_self_resource_and_chat( -        &self, -        message: Message, -        chat: JID, -    ) -> Result<(), Error> { -        let from_jid = message.from.as_bare(); -        let resource = &message.from.resourcepart; +    pub(crate) async fn upsert_chat_and_user(&self, chat: &JID) -> Result<bool, Error> {          let bare_chat = chat.as_bare();          sqlx::query!(              "insert into users (jid) values (?) on conflict do nothing", -            from_jid +            bare_chat,          )          .execute(&self.db)          .await?;          let id = Uuid::new_v4(); -        sqlx::query!( -            "insert into chats (id, correspondent) values (?, ?) on conflict do nothing", -            id, -            bare_chat -        ) -        .execute(&self.db) -        .await?; -        if let Some(resource) = resource { +        let chat: Chat = sqlx::query_as("insert into chats (id, correspondent, have_chatted) values (?, ?, ?) on conflict do nothing returning *") +            .bind(id) +            .bind(bare_chat) +            .bind(false) +            .fetch_one(&self.db) +            .await?; +        Ok(chat.have_chatted) +    } + +    /// MUST upsert chat beforehand +    pub(crate) async fn create_message_with_self_resource( +        &self, +        message: Message, +        chat: JID, +        // full jid +        from: JID, +    ) -> Result<(), Error> { +        let from_jid = from.as_bare(); +        if let Some(resource) = &from.resourcepart {              sqlx::query!(                  "insert into resources (bare_jid, resource) values (?, ?) on conflict do nothing",                  from_jid, @@ -427,15 +467,17 @@ impl Db {              .execute(&self.db)              .await?;          } -        self.create_message(message, chat).await?; +        self.create_message(message, chat, from).await?;          Ok(())      }      // create direct message from incoming -    pub(crate) async fn create_message_with_user_resource_and_chat( +    pub(crate) async fn create_message_with_user_resource(          &self,          message: Message,          chat: JID, +        // full jid +        from: JID,      ) -> Result<(), Error> {          let bare_chat = chat.as_bare();          let resource = &chat.resourcepart; @@ -462,7 +504,7 @@ impl Db {              .execute(&self.db)              .await?;          } -        self.create_message(message, chat).await?; +        self.create_message(message, chat, from).await?;          Ok(())      } diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 8e2e4be..9ecc330 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -34,12 +34,22 @@ pub enum Error {      MessageSend(#[from] MessageSendError),      #[error("message receive error: {0}")]      MessageRecv(#[from] MessageRecvError), +    #[error("subscripbe error: {0}")] +    Subscribe(#[from] SubscribeError), +    #[error("publish error: {0}")] +    Publish(#[from] PublishError),  }  #[derive(Debug, Error, Clone)]  pub enum MessageSendError {      #[error("could not add to message history: {0}")]      MessageHistory(#[from] DatabaseError), +    #[error("could not mark chat as chatted: {0}")] +    MarkChatAsChatted(DatabaseError), +    #[error("could not get client user details: {0}")] +    GetUserDetails(DatabaseError), +    #[error("writing message to connection: {0}")] +    Write(#[from] WriteError),  }  #[derive(Debug, Error, Clone)] @@ -48,6 +58,8 @@ pub enum MessageRecvError {      MessageHistory(#[from] DatabaseError),      #[error("missing from")]      MissingFrom, +    #[error("could not update user nick: {0}")] +    NickUpdate(DatabaseError),  }  #[derive(Debug, Error, Clone)] @@ -75,7 +87,7 @@ pub enum RosterError {      #[error("cache: {0}")]      Cache(#[from] DatabaseError),      #[error("iq response: {0}")] -    IqResponse(#[from] RequestError), +    IqResponse(#[from] IqRequestError),      #[error("stream write: {0}")]      Write(#[from] WriteError),      // TODO: display for stanza, to show as xml, same for read error types. @@ -92,7 +104,7 @@ pub enum DiscoError {      #[error("write error: {0}")]      Write(#[from] WriteError),      #[error("iq response: {0}")] -    IqResponse(#[from] RequestError), +    IqResponse(#[from] IqRequestError),      #[error("reply from incorrect entity: {0}")]      IncorrectEntity(JID),      #[error("unexpected reply: {0:?}")] @@ -108,7 +120,7 @@ pub enum DiscoError {  }  #[derive(Debug, Error, Clone)] -pub enum RequestError { +pub enum IqRequestError {      #[error("sending request: {0}")]      Write(#[from] WriteError),      #[error("receiving expected response: {0}")] @@ -173,6 +185,14 @@ impl From<tokio::io::Error> for DatabaseOpenError {  }  #[derive(Debug, Error, Clone)] +pub enum SubscribeError { +    #[error("write: {0}")] +    Write(#[from] WriteError), +    #[error("fetching client user details: {0}")] +    Database(#[from] DatabaseError), +} + +#[derive(Debug, Error, Clone)]  pub enum PresenceError {      #[error("unsupported")]      Unsupported, @@ -181,3 +201,29 @@ pub enum PresenceError {      #[error("stanza error: {0}")]      StanzaError(#[from] stanza::client::error::Error),  } + +#[derive(Debug, Error, Clone)] +pub enum PublishError { +    #[error("received mismatched query")] +    MismatchedQuery(Query), +    #[error("missing query")] +    MissingQuery, +    #[error("stanza errors: {0:?}")] +    StanzaErrors(Vec<stanza::client::error::Error>), +    #[error("reply from incorrect entity: {0}")] +    IncorrectEntity(JID), +    #[error("unexpected stanza: {0:?}")] +    UnexpectedStanza(Stanza), +    #[error("iq response: {0}")] +    IqResponse(#[from] IqRequestError), +} + +#[derive(Debug, Error, Clone)] +pub enum NickError { +    #[error("publishing nick: {0}")] +    Publish(#[from] CommandError<PublishError>), +    #[error("updating database: {0}")] +    Database(#[from] DatabaseError), +    #[error("disconnected")] +    Disconnected, +} diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index 4d852e2..6118f75 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -6,13 +6,13 @@ use std::{      time::Duration,  }; -use chat::{Body, Chat, Message}; +use chat::{Body, Chat, Delivery, Message};  use chrono::Utc;  use db::Db;  use disco::{Info, Items};  use error::{ -    ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, PresenceError, -    RosterError, StatusError, +    ConnectionJobError, DatabaseError, DiscoError, Error, IqError, MessageRecvError, NickError, +    PresenceError, PublishError, RosterError, StatusError, SubscribeError,  };  use futures::FutureExt;  use jid::JID; @@ -40,6 +40,7 @@ pub mod db;  pub mod disco;  pub mod error;  mod logic; +pub mod pep;  pub mod presence;  pub mod roster;  pub mod user; @@ -68,13 +69,13 @@ pub enum Command {      /// add a contact to your roster, with a status of none, no subscriptions.      AddContact(JID, oneshot::Sender<Result<(), RosterError>>),      /// send a friend request i.e. a subscription request with a subscription pre-approval. if not already added to roster server adds to roster. -    BuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    BuddyRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),      /// send a subscription request, without pre-approval. if not already added to roster server adds to roster. -    SubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    SubscriptionRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),      /// accept a friend request by accepting a pending subscription and sending a subscription request back. if not already added to roster adds to roster. -    AcceptBuddyRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    AcceptBuddyRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),      /// accept a pending subscription and doesn't send a subscription request back. if not already added to roster adds to roster. -    AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), WriteError>>), +    AcceptSubscriptionRequest(JID, oneshot::Sender<Result<(), SubscribeError>>),      /// unsubscribe to a contact, but don't remove their subscription.      UnsubscribeFromContact(JID, oneshot::Sender<Result<(), WriteError>>),      /// stop a contact from being subscribed, but stay subscribed to the contact. @@ -98,7 +99,9 @@ pub enum Command {      // TODO: should probably make it so people can add non-contact auto presence sharing in the client (most likely through setting an internal setting)      /// send a message to a jid (any kind of jid that can receive a message, e.g. a user or a      /// chatroom). if disconnected, will be cached so when client connects, message will be sent. -    SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>), +    SendMessage(JID, Body), +    // TODO: resend failed messages +    // ResendMessage(Uuid),      /// disco info query      DiscoInfo(          Option<JID>, @@ -111,6 +114,14 @@ pub enum Command {          Option<String>,          oneshot::Sender<Result<disco::Items, DiscoError>>,      ), +    /// publish item to a pep node, specified or default according to item. +    Publish { +        item: pep::Item, +        node: String, +        sender: oneshot::Sender<Result<(), PublishError>>, +    }, +    /// change user nickname +    ChangeNick(String, oneshot::Sender<Result<(), NickError>>),  }  #[derive(Debug, Clone)] @@ -134,7 +145,15 @@ pub enum UpdateMessage {          to: JID,          message: Message,      }, +    MessageDelivery { +        id: Uuid, +        delivery: Delivery, +    },      SubscriptionRequest(jid::JID), +    NickChanged { +        jid: JID, +        nick: String, +    },  }  /// an xmpp client that is suited for a chat client use case @@ -192,7 +211,7 @@ impl Client {              timeout: Duration::from_secs(10),          }; -        let logic = ClientLogic::new(client.clone(), db, update_send); +        let logic = ClientLogic::new(client.clone(), jid.as_bare(), db, update_send);          let actor: CoreClient<ClientLogic> =              CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); @@ -328,7 +347,7 @@ impl Client {          Ok(result)      } -    pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +    pub async fn buddy_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(Command::BuddyRequest(jid, send)))              .await @@ -340,7 +359,7 @@ impl Client {          Ok(result)      } -    pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +    pub async fn subscription_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(Command::SubscriptionRequest(              jid, send, @@ -354,7 +373,7 @@ impl Client {          Ok(result)      } -    pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<WriteError>> { +    pub async fn accept_buddy_request(&self, jid: JID) -> Result<(), CommandError<SubscribeError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(Command::AcceptBuddyRequest(              jid, send, @@ -371,7 +390,7 @@ impl Client {      pub async fn accept_subscription_request(          &self,          jid: JID, -    ) -> Result<(), CommandError<WriteError>> { +    ) -> Result<(), CommandError<SubscribeError>> {          let (send, recv) = oneshot::channel();          self.send(CoreClientCommand::Command(              Command::AcceptSubscriptionRequest(jid, send), @@ -471,18 +490,10 @@ impl Client {          Ok(result)      } -    pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), CommandError<WriteError>> { -        let (send, recv) = oneshot::channel(); -        self.send(CoreClientCommand::Command(Command::SendMessage( -            jid, body, send, -        ))) -        .await -        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; -        let result = timeout(self.timeout, recv) -            .await -            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? -            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; -        Ok(result) +    pub async fn send_message(&self, jid: JID, body: Body) -> Result<(), ActorError> { +        self.send(CoreClientCommand::Command(Command::SendMessage(jid, body))) +            .await?; +        Ok(())      }      pub async fn disco_info( @@ -520,6 +531,38 @@ impl Client {              .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??;          Ok(result)      } + +    pub async fn publish( +        &self, +        item: pep::Item, +        node: String, +    ) -> Result<(), CommandError<PublishError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::Publish { +            item, +            node, +            sender: send, +        })) +        .await +        .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    } + +    pub async fn change_nick(&self, nick: String) -> Result<(), CommandError<NickError>> { +        let (send, recv) = oneshot::channel(); +        self.send(CoreClientCommand::Command(Command::ChangeNick(nick, send))) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))?; +        let result = timeout(self.timeout, recv) +            .await +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))? +            .map_err(|e| CommandError::Actor(Into::<ActorError>::into(e)))??; +        Ok(result) +    }  }  impl From<Command> for CoreClientCommand<Command> { diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 15c2d12..1ddd7d3 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -1,5 +1,6 @@  use std::{collections::HashMap, sync::Arc}; +use jid::JID;  use lampada::{Connected, Logic, error::ReadError};  use stanza::client::Stanza;  use tokio::sync::{Mutex, mpsc, oneshot}; @@ -8,7 +9,7 @@ use tracing::{error, info, warn};  use crate::{      Client, Command, UpdateMessage,      db::Db, -    error::{Error, RequestError, ResponseError}, +    error::{Error, IqRequestError, ResponseError},  };  mod abort; @@ -23,6 +24,7 @@ mod process_stanza;  #[derive(Clone)]  pub struct ClientLogic {      client: Client, +    bare_jid: JID,      db: Db,      pending: Pending,      update_sender: mpsc::Sender<UpdateMessage>, @@ -41,7 +43,7 @@ impl Pending {          connection: &Connected,          request: Stanza,          id: String, -    ) -> Result<Stanza, RequestError> { +    ) -> Result<Stanza, IqRequestError> {          let (send, recv) = oneshot::channel();          {              self.0.lock().await.insert(id, send); @@ -74,12 +76,18 @@ impl Pending {  }  impl ClientLogic { -    pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self { +    pub fn new( +        client: Client, +        bare_jid: JID, +        db: Db, +        update_sender: mpsc::Sender<UpdateMessage>, +    ) -> Self {          Self {              db,              pending: Pending::new(),              update_sender,              client, +            bare_jid,          }      } diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs index bc2666a..6399cf7 100644 --- a/filamento/src/logic/offline.rs +++ b/filamento/src/logic/offline.rs @@ -1,8 +1,14 @@ +use chrono::Utc;  use lampada::error::WriteError; +use uuid::Uuid;  use crate::{      Command, -    error::{DatabaseError, DiscoError, Error, RosterError, StatusError}, +    chat::{Delivery, Message}, +    error::{ +        DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, RosterError, +        StatusError, +    },      presence::Online,      roster::Contact,  }; @@ -76,16 +82,16 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res              sender.send(Err(RosterError::Write(WriteError::Disconnected)));          }          Command::BuddyRequest(_jid, sender) => { -            sender.send(Err(WriteError::Disconnected)); +            sender.send(Err(WriteError::Disconnected.into()));          }          Command::SubscriptionRequest(_jid, sender) => { -            sender.send(Err(WriteError::Disconnected)); +            sender.send(Err(WriteError::Disconnected.into()));          }          Command::AcceptBuddyRequest(_jid, sender) => { -            sender.send(Err(WriteError::Disconnected)); +            sender.send(Err(WriteError::Disconnected.into()));          }          Command::AcceptSubscriptionRequest(_jid, sender) => { -            sender.send(Err(WriteError::Disconnected)); +            sender.send(Err(WriteError::Disconnected.into()));          }          Command::UnsubscribeFromContact(_jid, sender) => {              sender.send(Err(WriteError::Disconnected)); @@ -107,8 +113,42 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res              sender.send(result);          }          // TODO: offline message queue -        Command::SendMessage(_jid, _body, sender) => { -            sender.send(Err(WriteError::Disconnected)); +        Command::SendMessage(jid, body) => { +            let id = Uuid::new_v4(); +            let timestamp = Utc::now(); + +            let message = Message { +                id, +                from: logic.bare_jid.clone(), +                // TODO: failure reason +                delivery: Some(Delivery::Failed), +                timestamp, +                body, +            }; +            // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send +            // TODO: mark these as potentially failed upon client launch +            if let Err(e) = logic +                .db() +                .create_message_with_self_resource( +                    message.clone(), +                    jid.clone(), +                    // TODO: when message is queued and sent, the from must also be updated with the correct resource +                    logic.bare_jid.clone(), +                ) +                .await +            { +                // TODO: should these really be handle_error or just the error macro? +                logic +                    .handle_error(MessageSendError::MessageHistory(e.into()).into()) +                    .await; +            } +            logic +                .update_sender() +                .send(crate::UpdateMessage::Message { +                    to: jid.as_bare(), +                    message, +                }) +                .await;          }          Command::SendPresence(_jid, _presence, sender) => {              sender.send(Err(WriteError::Disconnected)); @@ -119,6 +159,16 @@ pub async fn handle_offline_result(logic: &ClientLogic, command: Command) -> Res          Command::DiscoItems(_jid, _node, sender) => {              sender.send(Err(DiscoError::Write(WriteError::Disconnected)));          } +        Command::Publish { +            item: _, +            node: _, +            sender, +        } => { +            sender.send(Err(IqRequestError::Write(WriteError::Disconnected).into())); +        } +        Command::ChangeNick(_, sender) => { +            sender.send(Err(NickError::Disconnected)); +        }      }      Ok(())  } diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index 63a4aa3..d32f527 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -3,10 +3,11 @@ use jid::JID;  use lampada::{Connected, WriteMessage, error::WriteError};  use stanza::{      client::{ -        Stanza, -        iq::{self, Iq, IqType, Query}, +        iq::{self, Iq, IqType, Query}, Stanza      },      xep_0030::{info, items}, +    xep_0060::pubsub::{self, Pubsub}, +    xep_0172::{self, Nick},      xep_0203::Delay,  };  use tokio::sync::oneshot; @@ -14,12 +15,9 @@ use tracing::{debug, error, info};  use uuid::Uuid;  use crate::{ -    Command, UpdateMessage, -    chat::{Body, Message}, -    disco::{Info, Items}, -    error::{DatabaseError, DiscoError, Error, MessageSendError, RosterError, StatusError}, -    presence::{Online, Presence, PresenceType}, -    roster::{Contact, ContactUpdate}, +    chat::{Body, Chat, Delivery, Message}, disco::{Info, Items}, error::{ +        DatabaseError, DiscoError, Error, IqRequestError, MessageSendError, NickError, PublishError, RosterError, StatusError, SubscribeError +    }, pep, presence::{Online, Presence, PresenceType}, roster::{Contact, ContactUpdate}, Command, UpdateMessage  };  use super::{ @@ -156,105 +154,82 @@ pub async fn handle_add_contact(      }  } -pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> { +pub async fn handle_buddy_request( +    logic: &ClientLogic, +    connection: Connected, +    jid: JID, +) -> Result<(), SubscribeError> { +    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; +    let nick = client_user.nick.map(|nick| Nick(nick));      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid.clone()),          r#type: Some(stanza::client::presence::PresenceType::Subscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        nick, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Subscribed), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(())  }  pub async fn handle_subscription_request( +    logic: &ClientLogic,      connection: Connected,      jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> {      // TODO: i should probably have builders +    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; +    let nick = client_user.nick.map(|nick| Nick(nick));      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Subscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        nick, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(())  }  pub async fn handle_accept_buddy_request( +    logic: &ClientLogic,      connection: Connected,      jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> {      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid.clone()),          r#type: Some(stanza::client::presence::PresenceType::Subscribed), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?; +    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; +    let nick = client_user.nick.map(|nick| Nick(nick));      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Subscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        nick, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(())  }  pub async fn handle_accept_subscription_request( +    logic: &ClientLogic,      connection: Connected,      jid: JID, -) -> Result<(), WriteError> { +) -> Result<(), SubscribeError> { +    let client_user = logic.db.read_user(logic.bare_jid.clone()).await?; +    let nick = client_user.nick.map(|nick| Nick(nick));      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Subscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        nick, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(()) @@ -265,16 +240,9 @@ pub async fn handle_unsubscribe_from_contact(      jid: JID,  ) -> Result<(), WriteError> {      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(()) @@ -282,16 +250,9 @@ pub async fn handle_unsubscribe_from_contact(  pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(()) @@ -299,29 +260,15 @@ pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Resu  pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid.clone()),          r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      let presence = Stanza::Presence(stanza::client::presence::Presence { -        from: None, -        id: None,          to: Some(jid),          r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), -        lang: None, -        show: None, -        status: None, -        priority: None, -        errors: Vec::new(), -        delay: None, +        ..Default::default()      });      connection.write_handle().write(presence).await?;      Ok(()) @@ -464,60 +411,119 @@ pub async fn handle_set_status(      Ok(())  } -pub async fn handle_send_message( -    logic: &ClientLogic, -    connection: Connected, -    jid: JID, -    body: Body, -) -> Result<(), WriteError> { +pub async fn handle_send_message(logic: &ClientLogic, connection: Connected, jid: JID, body: Body) { +    // upsert the chat and user the message will be delivered to. if there is a conflict, it will return whatever was there, otherwise it will return false by default. +    let have_chatted = logic.db().upsert_chat_and_user(&jid).await.unwrap_or(false); + +    let nick; +    let mark_chat_as_chatted; +    if have_chatted == false { +        match logic.db.read_user(logic.bare_jid.clone()).await { +            Ok(u) => { +                nick = u.nick.map(|nick| Nick(nick)); +                mark_chat_as_chatted = true; +            } +            Err(e) => { +                logic +                    .handle_error(MessageSendError::GetUserDetails(e.into()).into()) +                    .await; +                nick = None; +                mark_chat_as_chatted = false; +            } +        } +    } else { +        nick = None; +        mark_chat_as_chatted = false; +    } + +    // generate message struct      let id = Uuid::new_v4();      let timestamp = Utc::now(); -    let message = Stanza::Message(stanza::client::message::Message { +    let message = Message { +        id, +        from: connection.jid().as_bare(), +        body: body.clone(), +        timestamp, +        delivery: Some(Delivery::Sending), +    }; + +    // try to store in message history that there is a new message that is sending. if client is quit mid-send then can mark as failed and re-send +    // TODO: mark these as potentially failed upon client launch +    if let Err(e) = logic +        .db() +        .create_message_with_self_resource(message.clone(), jid.clone(), connection.jid().clone()) +        .await +    { +        // TODO: should these really be handle_error or just the error macro? +        logic +            .handle_error(MessageSendError::MessageHistory(e.into()).into()) +            .await; +    } + +    // tell the client a message is being sent +    logic +        .update_sender() +        .send(UpdateMessage::Message { +            to: jid.as_bare(), +            message, +        }) +        .await; + +    // prepare the message stanza +    let message_stanza = Stanza::Message(stanza::client::message::Message {          from: Some(connection.jid().clone()),          id: Some(id.to_string()),          to: Some(jid.clone()),          // TODO: specify message type          r#type: stanza::client::message::MessageType::Chat,          // TODO: lang ? -        lang: None, -        subject: None,          body: Some(stanza::client::message::Body {              lang: None,              body: Some(body.body.clone()),          }), -        thread: None,          // include delay to have a consistent timestamp between server and client          delay: Some(Delay {              from: None,              stamp: timestamp,          }), +        nick, +        ..Default::default()      }); -    connection.write_handle().write(message).await?; -    let mut message = Message { -        id, -        from: connection.jid().clone(), -        body, -        timestamp, -    }; -    info!("sent message: {:?}", message); -    if let Err(e) = logic -        .db() -        .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) -        .await -    { -        // TODO: should these really be handle_error or just the error macro? -        logic -            .handle_error(MessageSendError::MessageHistory(e.into()).into()) -            .await; -    } -    // TODO: don't do this, have separate from from details -    message.from = message.from.as_bare(); -    let _ = logic -        .update_sender() -        .send(UpdateMessage::Message { to: jid, message }) + +    // send the message +    let result = connection +        .write_handle() +        .write(message_stanza.clone())          .await; -    Ok(()) -    // TODO: refactor this to send a sending updatemessage, then update or something like that +    match result { +        Ok(_) => { +            info!("sent message: {:?}", message_stanza); +            logic +                .update_sender() +                .send(UpdateMessage::MessageDelivery { +                    id, +                    delivery: Delivery::Written, +                }) +                .await; +            if mark_chat_as_chatted { +                if let Err(e) = logic.db.mark_chat_as_chatted(jid).await { +                    logic +                        .handle_error(MessageSendError::MarkChatAsChatted(e.into()).into()) +                        .await; +                } +            } +        } +        Err(e) => { +            logic +                .update_sender() +                .send(UpdateMessage::MessageDelivery { +                    id, +                    delivery: Delivery::Failed, +                }) +                .await; +            logic.handle_error(MessageSendError::Write(e).into()).await; +        } +    }  }  pub async fn handle_send_presence( @@ -673,6 +679,78 @@ pub async fn handle_disco_items(      }  } +pub async fn handle_publish( +    logic: &ClientLogic, +    connection: Connected, +    item: pep::Item, +    node: String, +) -> Result<(), PublishError> { +    let id = Uuid::new_v4().to_string(); +    let publish = match item { +        pep::Item::Nick(n) => pubsub::Publish { +            node, +            items: vec![pubsub::Item { +                item: Some(pubsub::Content::Nick(Nick(n))), +                ..Default::default() +            }], +        }, +    }; +    let request = Iq { +        from: Some(connection.jid().clone()), +        id: id.clone(), +        to: None, +        r#type: IqType::Set, +        lang: None, +        query: Some(Query::Pubsub(Pubsub::Publish(publish, None))), +        errors: Vec::new(), +    }; +    match logic +        .pending() +        .request(&connection, Stanza::Iq(request), id) +        .await? { +         +        Stanza::Iq(Iq { +            from, +            r#type, +            query, +            errors, +            .. +            // TODO: maybe abstract a bunch of these different errors related to iqs into an iq error thing? as in like call iq.result(), get the query from inside, error otherwise. +        }) if r#type == IqType::Result || r#type == IqType::Error => { +            if from == None ||  +                    from == Some(connection.jid().as_bare()) +             { +                match r#type { +                    IqType::Result => { +                        if let Some(query) = query { +                            match query { +                                Query::Pubsub(_) => Ok(()), +                                q => Err(PublishError::MismatchedQuery(q)), +                            } +                        } else { +                            Err(PublishError::MissingQuery) +                        } +                    } +                    IqType::Error => { +                        Err(PublishError::StanzaErrors(errors)) +                    } +                    _ => unreachable!(), +                } +            } else { +                Err(PublishError::IncorrectEntity( +                    from.unwrap_or_else(|| connection.jid().as_bare()), +                )) +            } +        } +        s => Err(PublishError::UnexpectedStanza(s)), +    } +} + +pub async fn handle_change_nick(logic: &ClientLogic, nick: String) -> Result<(), NickError> { +    logic.client().publish(pep::Item::Nick(nick), xep_0172::XMLNS.to_string()).await?; +    Ok(()) +} +  // TODO: could probably macro-ise?  pub async fn handle_online_result(      logic: &ClientLogic, @@ -716,25 +794,24 @@ pub async fn handle_online_result(              let user = handle_get_user(logic, jid).await;              let _ = sender.send(user);          } -        // TODO: offline queue to modify roster          Command::AddContact(jid, sender) => {              let result = handle_add_contact(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::BuddyRequest(jid, sender) => { -            let result = handle_buddy_request(connection, jid).await; +            let result = handle_buddy_request(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::SubscriptionRequest(jid, sender) => { -            let result = handle_subscription_request(connection, jid).await; +            let result = handle_subscription_request(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::AcceptBuddyRequest(jid, sender) => { -            let result = handle_accept_buddy_request(connection, jid).await; +            let result = handle_accept_buddy_request(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::AcceptSubscriptionRequest(jid, sender) => { -            let result = handle_accept_subscription_request(connection, jid).await; +            let result = handle_accept_subscription_request(logic, connection, jid).await;              let _ = sender.send(result);          }          Command::UnsubscribeFromContact(jid, sender) => { @@ -761,10 +838,8 @@ pub async fn handle_online_result(              let result = handle_set_status(logic, connection, online).await;              let _ = sender.send(result);          } -        // TODO: offline message queue -        Command::SendMessage(jid, body, sender) => { -            let result = handle_send_message(logic, connection, jid, body).await; -            let _ = sender.send(result); +        Command::SendMessage(jid, body) => { +            handle_send_message(logic, connection, jid, body).await;          }          Command::SendPresence(jid, presence, sender) => {              let result = handle_send_presence(connection, jid, presence).await; @@ -778,6 +853,14 @@ pub async fn handle_online_result(              let result = handle_disco_items(logic, connection, jid, node).await;              let _ = sender.send(result);          } +        Command::Publish { item, node, sender } => { +            let result = handle_publish(logic, connection, item, node).await; +            let _ = sender.send(result); +        } +        Command::ChangeNick(nick, sender) => { +            let result = handle_change_nick(logic, nick).await; +            let _ = sender.send(result); +        }      }      Ok(())  } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index b1bc830..2f6644e 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -41,38 +41,74 @@ pub async fn recv_message(      logic: ClientLogic,      stanza_message: stanza::client::message::Message,  ) -> Result<Option<UpdateMessage>, MessageRecvError> { -    if let Some(mut from) = stanza_message.from { +    if let Some(from) = stanza_message.from {          // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.          let timestamp = stanza_message              .delay              .map(|delay| delay.stamp)              .unwrap_or_else(|| Utc::now());          // TODO: group chat messages -        let mut message = Message { -            id: stanza_message -                .id -                // TODO: proper id storage -                .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) -                .unwrap_or_else(|| Uuid::new_v4()), -            from: from.clone(), -            timestamp, -            body: Body { -                // TODO: should this be an option? -                body: stanza_message -                    .body -                    .map(|body| body.body) -                    .unwrap_or_default() -                    .unwrap_or_default(), -            }, -        }; + +        // if there is a body, should create chat message +        if let Some(body) = stanza_message.body { +            let message = Message { +                id: stanza_message +                    .id +                    // TODO: proper id xep +                    .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) +                    .unwrap_or_else(|| Uuid::new_v4()), +                from: from.as_bare(), +                timestamp, +                body: Body { +                    body: body.body.unwrap_or_default(), +                }, +                delivery: None, +            }; + +            // save the message to the database +            logic.db().upsert_chat_and_user(&from).await?; +            if let Err(e) = logic +                .db() +                .create_message_with_user_resource(message.clone(), from.clone(), from.clone()) +                .await +            { +                logic +                    .handle_error(Error::MessageRecv(MessageRecvError::MessageHistory(e))) +                    .await; +            } + +            // update the client with the new message +            logic +                .update_sender() +                .send(UpdateMessage::Message { +                    to: from.as_bare(), +                    message, +                }) +                .await; +        } + +        if let Some(nick) = stanza_message.nick { +            if let Err(e) = logic +                .db() +                .upsert_user_nick(from.as_bare(), nick.0.clone()) +                .await +            { +                logic +                    .handle_error(Error::MessageRecv(MessageRecvError::NickUpdate(e))) +                    .await; +            } + +            logic +                .update_sender() +                .send(UpdateMessage::NickChanged { +                    jid: from.as_bare(), +                    nick: nick.0, +                }) +                .await; +        } + +        Ok(None)          // TODO: can this be more efficient? -        logic -            .db() -            .create_message_with_user_resource_and_chat(message.clone(), from.clone()) -            .await?; -        message.from = message.from.as_bare(); -        from = from.as_bare(); -        Ok(Some(UpdateMessage::Message { to: from, message }))      } else {          Err(MessageRecvError::MissingFrom)      } diff --git a/filamento/src/pep.rs b/filamento/src/pep.rs new file mode 100644 index 0000000..c71d843 --- /dev/null +++ b/filamento/src/pep.rs @@ -0,0 +1,17 @@ +// in commandmessage +// pub struct Publish { +//     item: Item, +//     node: Option<String>, +//     // no need for node, as item has the node +// } +// +// in updatemessage +// pub struct Event { +//     from: JID, +//     item: Item, +// } + +#[derive(Clone, Debug)] +pub enum Item { +    Nick(String), +} diff --git a/filamento/src/presence.rs b/filamento/src/presence.rs index e35761c..bae8793 100644 --- a/filamento/src/presence.rs +++ b/filamento/src/presence.rs @@ -78,11 +78,6 @@ impl Online {          timestamp: Option<DateTime<Utc>>,      ) -> stanza::client::presence::Presence {          stanza::client::presence::Presence { -            from: None, -            id: None, -            to: None, -            r#type: None, -            lang: None,              show: self.show.map(|show| match show {                  Show::Away => stanza::client::presence::Show::Away,                  Show::Chat => stanza::client::presence::Show::Chat, @@ -97,11 +92,11 @@ impl Online {              priority: self                  .priority                  .map(|priority| stanza::client::presence::Priority(priority)), -            errors: Vec::new(),              delay: timestamp.map(|timestamp| Delay {                  from: None,                  stamp: timestamp,              }), +            ..Default::default()          }      }  } @@ -112,22 +107,16 @@ impl Offline {          timestamp: Option<DateTime<Utc>>,      ) -> stanza::client::presence::Presence {          stanza::client::presence::Presence { -            from: None, -            id: None, -            to: None,              r#type: Some(stanza::client::presence::PresenceType::Unavailable), -            lang: None, -            show: None,              status: self.status.map(|status| stanza::client::presence::Status {                  lang: None,                  status: String1024(status),              }), -            priority: None, -            errors: Vec::new(),              delay: timestamp.map(|timestamp| Delay {                  from: None,                  stamp: timestamp,              }), +            ..Default::default()          }      }  } diff --git a/filamento/src/user.rs b/filamento/src/user.rs index 9914d14..85471d5 100644 --- a/filamento/src/user.rs +++ b/filamento/src/user.rs @@ -1,7 +1,8 @@  use jid::JID; -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, sqlx::FromRow, Clone)]  pub struct User {      pub jid: JID, +    pub nick: Option<String>,      pub cached_status_message: Option<String>,  } | 
