diff options
author | 2025-03-27 19:09:35 +0000 | |
---|---|---|
committer | 2025-03-27 19:09:35 +0000 | |
commit | a367aca33fecc03270b5b9ad2a6a21281d760fd8 (patch) | |
tree | 695270823ee5d55f483875580c509fb2c300bd26 | |
parent | 83a6aa0574190137b38331bd53795324139237cf (diff) | |
download | luz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.tar.gz luz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.tar.bz2 luz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.zip |
refactor(filamento): handle_online logic
-rw-r--r-- | filamento/examples/example.rs | 2 | ||||
-rw-r--r-- | filamento/src/error.rs | 20 | ||||
-rw-r--r-- | filamento/src/lib.rs | 7 | ||||
-rw-r--r-- | filamento/src/logic/abort.rs | 5 | ||||
-rw-r--r-- | filamento/src/logic/connect.rs | 3 | ||||
-rw-r--r-- | filamento/src/logic/local_only.rs | 47 | ||||
-rw-r--r-- | filamento/src/logic/mod.rs | 69 | ||||
-rw-r--r-- | filamento/src/logic/online.rs | 1152 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 15 |
9 files changed, 701 insertions, 619 deletions
diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs index e36968d..b1ab6ce 100644 --- a/filamento/examples/example.rs +++ b/filamento/examples/example.rs @@ -20,7 +20,7 @@ async fn main() { }); client.connect().await.unwrap(); - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(15)).await; info!("sending message"); client .send_message( diff --git a/filamento/src/error.rs b/filamento/src/error.rs index 6277292..ccb4406 100644 --- a/filamento/src/error.rs +++ b/filamento/src/error.rs @@ -73,6 +73,8 @@ pub enum ConnectionJobError { pub enum RosterError { #[error("cache: {0}")] Cache(#[from] DatabaseError), + #[error("iq response: {0}")] + IqResponse(#[from] RequestError), #[error("stream write: {0}")] Write(#[from] WriteError), // TODO: display for stanza, to show as xml, same for read error types. @@ -87,6 +89,20 @@ pub enum RosterError { } #[derive(Debug, Error, Clone)] +pub enum RequestError { + #[error("sending request: {0}")] + Write(#[from] WriteError), + #[error("receiving expected response: {0}")] + Read(#[from] ReadError), +} + +#[derive(Debug, Error, Clone)] +pub enum ResponseError { + #[error("no matching id: {0}")] + NoMatchingId(String), +} + +#[derive(Debug, Error, Clone)] #[error("database error: {0}")] pub struct DatabaseError(pub Arc<sqlx::Error>); @@ -107,8 +123,8 @@ impl From<sqlx::Error> for DatabaseOpenError { pub enum IqError { #[error("writing response: {0}")] WriteError(#[from] WriteError), - #[error("no iq with id matching `{0}`")] - NoMatchingId(String), + #[error("receiving response: `{0}`")] + ReceivedResponse(#[from] ResponseError), #[error("incorrect addressee: {0}")] IncorrectAddressee(jid::JID), } diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index 1e9207c..89da1a3 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -178,12 +178,7 @@ impl Client { timeout: Duration::from_secs(10), }; - let logic = ClientLogic::new( - client.clone(), - db, - Arc::new(Mutex::new(HashMap::new())), - update_send, - ); + let logic = ClientLogic::new(client.clone(), db, update_send); let actor: CoreClient<ClientLogic> = CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs index 32c4823..df82655 100644 --- a/filamento/src/logic/abort.rs +++ b/filamento/src/logic/abort.rs @@ -3,8 +3,5 @@ use lampada::error::ReadError; use super::ClientLogic; pub async fn on_abort(logic: ClientLogic) { - let mut iqs = logic.pending().lock().await; - for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(ReadError::LostConnection)); - } + logic.pending().drain().await; } diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs index d7b9fee..dc05448 100644 --- a/filamento/src/logic/connect.rs +++ b/filamento/src/logic/connect.rs @@ -19,7 +19,7 @@ pub async fn handle_connect(logic: ClientLogic, connection: Connected) { .await; debug!("sent roster req"); let roster = recv.await; - debug!("got roster"); + debug!("got roster: {:?}", roster); match roster { Ok(r) => match r { Ok(roster) => { @@ -42,6 +42,7 @@ pub async fn handle_connect(logic: ClientLogic, connection: Connected) { ) .await; let set_status = recv.await; + debug!("sent initial presence"); match set_status { Ok(s) => match s { Ok(()) => { diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs new file mode 100644 index 0000000..3f6fe8d --- /dev/null +++ b/filamento/src/logic/local_only.rs @@ -0,0 +1,47 @@ +use jid::JID; +use uuid::Uuid; + +use crate::{ + chat::{Chat, Message}, + error::DatabaseError, + user::User, +}; + +use super::ClientLogic; + +pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { + Ok(logic.db().read_chats().await?) +} + +pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> { + Ok(logic.db().read_chats_ordered().await?) +} + +pub async fn handle_get_chats_ordered_with_latest_messages( + logic: &ClientLogic, +) -> Result<Vec<(Chat, Message)>, DatabaseError> { + Ok(logic.db().read_chats_ordered_with_latest_messages().await?) +} + +pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> { + Ok(logic.db().read_chat(jid).await?) +} + +pub async fn handle_get_messages( + logic: &ClientLogic, + jid: JID, +) -> Result<Vec<Message>, DatabaseError> { + Ok(logic.db().read_message_history(jid).await?) +} + +pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> { + Ok(logic.db().delete_chat(jid).await?) +} + +pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> { + Ok(logic.db().delete_message(uuid).await?) +} + +pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> { + Ok(logic.db().read_user(jid).await?) +} diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 61d78bf..15c2d12 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -1,16 +1,21 @@ use std::{collections::HashMap, sync::Arc}; -use lampada::{Logic, error::ReadError}; +use lampada::{Connected, Logic, error::ReadError}; use stanza::client::Stanza; use tokio::sync::{Mutex, mpsc, oneshot}; use tracing::{error, info, warn}; -use crate::{Client, Command, UpdateMessage, db::Db, error::Error}; +use crate::{ + Client, Command, UpdateMessage, + db::Db, + error::{Error, RequestError, ResponseError}, +}; mod abort; mod connect; mod connection_error; mod disconnect; +mod local_only; mod offline; mod online; mod process_stanza; @@ -19,20 +24,60 @@ mod process_stanza; pub struct ClientLogic { client: Client, db: Db, - pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + pending: Pending, update_sender: mpsc::Sender<UpdateMessage>, } +#[derive(Clone)] +pub struct Pending(Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>); + +impl Pending { + pub fn new() -> Self { + Self(Arc::new(Mutex::new(HashMap::new()))) + } + + pub async fn request( + &self, + connection: &Connected, + request: Stanza, + id: String, + ) -> Result<Stanza, RequestError> { + let (send, recv) = oneshot::channel(); + { + self.0.lock().await.insert(id, send); + } + connection.write_handle().write(request).await?; + let stanza = recv.await.map_err(|e| ReadError::Actor(e.into()))??; + Ok(stanza) + } + + pub async fn respond(&self, response: Stanza, id: String) -> Result<(), ResponseError> { + let send; + { + send = self.0.lock().await.remove(&id); + } + match send { + Some(send) => { + let _ = send.send(Ok(response)); + Ok(()) + } + None => Err(ResponseError::NoMatchingId(id)), + } + } + + pub async fn drain(&self) { + let mut pending = self.0.lock().await; + for (_id, sender) in pending.drain() { + let _ = sender.send(Err(ReadError::LostConnection)); + } + } +} + impl ClientLogic { - pub fn new( - client: Client, - db: Db, - pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - update_sender: mpsc::Sender<UpdateMessage>, - ) -> Self { + pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self { Self { db, - pending, + pending: Pending::new(), update_sender, client, } @@ -46,8 +91,8 @@ impl ClientLogic { &self.db } - pub fn pending(&self) -> &Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>> { - &self.pending.as_ref() + pub fn pending(&self) -> &Pending { + &self.pending } pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs index 967ebb2..8bbeaa5 100644 --- a/filamento/src/logic/online.rs +++ b/filamento/src/logic/online.rs @@ -1,8 +1,12 @@ use chrono::Utc; +use jid::JID; use lampada::{Connected, WriteMessage, error::WriteError}; -use stanza::client::{ - Stanza, - iq::{self, Iq, IqType}, +use stanza::{ + client::{ + Stanza, + iq::{self, Iq, IqType}, + }, + xep_0203::Delay, }; use tokio::sync::oneshot; use tracing::{debug, info}; @@ -10,641 +14,625 @@ use uuid::Uuid; use crate::{ Command, UpdateMessage, - chat::Message, - error::{Error, MessageSendError, RosterError, StatusError}, - roster::Contact, + chat::{Body, Message}, + error::{DatabaseError, Error, MessageSendError, RosterError, StatusError}, + presence::{Online, Presence, PresenceType}, + roster::{Contact, ContactUpdate}, }; -use super::ClientLogic; +use super::{ + ClientLogic, + local_only::{ + handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats, + handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages, + handle_get_messages, handle_get_user, + }, +}; pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { - match command { - Command::GetRoster(result_sender) => { - let iq_id = Uuid::new_v4().to_string(); - let (send, iq_recv) = oneshot::channel(); - { - logic.pending().lock().await.insert(iq_id.clone(), send); - } - let stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.to_string(), - to: None, - r#type: IqType::Get, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: Vec::new(), - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - let _ = connection - .write_handle() - .send(WriteMessage { - stanza, - respond_to: send, - }) - .await; - // TODO: timeout - match recv.await { - Ok(Ok(())) => info!("roster request sent"), - Ok(Err(e)) => { - // TODO: log errors if fail to send - let _ = result_sender.send(Err(RosterError::Write(e.into()))); - return; - } - Err(e) => { - let _ = - result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } + let result = handle_online_result(&logic, command, connection).await; + match result { + Ok(_) => {} + Err(e) => logic.handle_error(e).await, + } +} + +pub async fn handle_get_roster( + logic: &ClientLogic, + connection: Connected, +) -> Result<Vec<Contact>, RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, stanza, iq_id.clone()) + .await?; + // TODO: timeout + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { + logic + .handle_error(Error::Roster(RosterError::Cache(e.into()))) + .await; }; - // TODO: timeout - match iq_recv.await { - Ok(Ok(stanza)) => match stanza { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - let contacts: Vec<Contact> = - items.into_iter().map(|item| item.into()).collect(); - if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { - logic - .handle_error(Error::Roster(RosterError::Cache(e.into()))) - .await; - }; - result_sender.send(Ok(contacts)); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - result_sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - result_sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Ok(Err(e)) => { - result_sender.send(Err(RosterError::Read(e))); - return; - } - Err(e) => { - result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } + Ok(contacts) + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_add_contact( + logic: &ClientLogic, + connection: Connected, + jid: JID, +) -> Result<(), RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: None, + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, set_stanza, iq_id.clone()) + .await?; + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => Ok(()), + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_subscription_request( + connection: Connected, + jid: JID, +) -> Result<(), WriteError> { + // TODO: i should probably have builders + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_accept_buddy_request( + connection: Connected, + jid: JID, +) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_accept_subscription_request( + connection: Connected, + jid: JID, +) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_unsubscribe_from_contact( + connection: Connected, + jid: JID, +) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + connection.write_handle().write(presence).await?; + Ok(()) +} + +pub async fn handle_delete_contact( + logic: &ClientLogic, + connection: Connected, + jid: JID, +) -> Result<(), RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: Some(stanza::roster::Subscription::Remove), + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let result = logic + .pending() + .request(&connection, set_stanza, iq_id.clone()) + .await?; + match result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + // don't really need to check matching id as request() does this anyway + }) if id == iq_id && r#type == IqType::Result => Ok(()), + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) + } + } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_update_contact( + logic: &ClientLogic, + connection: Connected, + jid: JID, + contact_update: ContactUpdate, +) -> Result<(), RosterError> { + let iq_id = Uuid::new_v4().to_string(); + let groups = Vec::from_iter( + contact_update + .groups + .into_iter() + .map(|group| stanza::roster::Group(Some(group))), + ); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: contact_update.name, + subscription: None, + groups, + }], + })), + errors: Vec::new(), + }); + let response = logic + .pending() + .request(&connection, set_stanza, iq_id.clone()) + .await?; + match response { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => Ok(()), + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + Err(RosterError::StanzaError(error.clone())) + } else { + Err(RosterError::UnexpectedStanza(s.clone())) } } + s => Err(RosterError::UnexpectedStanza(s)), + } +} + +pub async fn handle_set_status( + logic: &ClientLogic, + connection: Connected, + online: Online, +) -> Result<(), StatusError> { + logic + .db() + .upsert_cached_status(online.clone()) + .await + .map_err(|e| DatabaseError(e.into()))?; + connection + .write_handle() + .write(Stanza::Presence(online.into_stanza(None))) + .await?; + Ok(()) +} + +pub async fn handle_send_message( + logic: &ClientLogic, + connection: Connected, + jid: JID, + body: Body, +) -> Result<(), WriteError> { + let id = Uuid::new_v4(); + let timestamp = Utc::now(); + let message = Stanza::Message(stanza::client::message::Message { + from: Some(connection.jid().clone()), + id: Some(id.to_string()), + to: Some(jid.clone()), + // TODO: specify message type + r#type: stanza::client::message::MessageType::Chat, + // TODO: lang ? + lang: None, + subject: None, + body: Some(stanza::client::message::Body { + lang: None, + body: Some(body.body.clone()), + }), + thread: None, + // include delay to have a consistent timestamp between server and client + delay: Some(Delay { + from: None, + stamp: timestamp, + }), + }); + connection.write_handle().write(message).await?; + let mut message = Message { + id, + from: connection.jid().clone(), + body, + timestamp, + }; + info!("sent message: {:?}", message); + if let Err(e) = logic + .db() + .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) + .await + { + // TODO: should these really be handle_error or just the error macro? + logic + .handle_error(MessageSendError::MessageHistory(e.into()).into()) + .await; + } + // TODO: don't do this, have separate from from details + message.from = message.from.as_bare(); + let _ = logic + .update_sender() + .send(UpdateMessage::Message { to: jid, message }) + .await; + Ok(()) + // TODO: refactor this to send a sending updatemessage, then update or something like that +} + +pub async fn handle_send_presence( + connection: Connected, + jid: Option<JID>, + presence: PresenceType, +) -> Result<(), WriteError> { + let mut presence: stanza::client::presence::Presence = presence.into(); + presence.to = jid; + connection + .write_handle() + .write(Stanza::Presence(presence)) + .await?; + Ok(()) +} + +// TODO: could probably macro-ise? +pub async fn handle_online_result( + logic: &ClientLogic, + command: Command, + connection: Connected, +) -> Result<(), Error> { + match command { + Command::GetRoster(result_sender) => { + let roster = handle_get_roster(logic, connection).await; + let _ = result_sender.send(roster); + } Command::GetChats(sender) => { - let chats = logic.db().read_chats().await.map_err(|e| e.into()); - sender.send(chats); + let chats = handle_get_chats(logic).await; + let _ = sender.send(chats); } Command::GetChatsOrdered(sender) => { - let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); + let chats = handle_get_chats_ordered(logic).await; + let _ = sender.send(chats); } Command::GetChatsOrderedWithLatestMessages(sender) => { - let chats = logic - .db() - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); + let chats = handle_get_chats_ordered_with_latest_messages(logic).await; + let _ = sender.send(chats); } Command::GetChat(jid, sender) => { - let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); + let chat = handle_get_chat(logic, jid).await; + let _ = sender.send(chat); } Command::GetMessages(jid, sender) => { - let messages = logic - .db() - .read_message_history(jid) - .await - .map_err(|e| e.into()); - sender.send(messages); + let messages = handle_get_messages(logic, jid).await; + let _ = sender.send(messages); } Command::DeleteChat(jid, sender) => { - let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); + let result = handle_delete_chat(logic, jid).await; + let _ = sender.send(result); } Command::DeleteMessage(uuid, sender) => { - let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); + let result = handle_delete_messaage(logic, uuid).await; + let _ = sender.send(result); } Command::GetUser(jid, sender) => { - let user = logic.db().read_user(jid).await.map_err(|e| e.into()); - sender.send(user); + let user = handle_get_user(logic, jid).await; + let _ = sender.send(user); } // TODO: offline queue to modify roster Command::AddContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: None, - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - logic.pending().lock().await.insert(iq_id.clone(), send); - } - // TODO: write_handle send helper function - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } + let result = handle_add_contact(logic, connection, jid).await; + let _ = sender.send(result); } Command::BuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } + let result = handle_buddy_request(connection, jid).await; + let _ = sender.send(result); } Command::SubscriptionRequest(jid, sender) => { - // TODO: i should probably have builders - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; + let result = handle_subscription_request(connection, jid).await; let _ = sender.send(result); } Command::AcceptBuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } + let result = handle_accept_buddy_request(connection, jid).await; + let _ = sender.send(result); } Command::AcceptSubscriptionRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; + let result = handle_accept_subscription_request(connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeFromContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; + let result = handle_unsubscribe_from_contact(connection, jid).await; let _ = sender.send(result); } Command::UnsubscribeContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; + let result = handle_unsubscribe_contact(connection, jid).await; let _ = sender.send(result); } Command::UnfriendContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } + let result = handle_unfriend_contact(connection, jid).await; + let _ = sender.send(result); } Command::DeleteContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: Some(stanza::roster::Subscription::Remove), - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - logic.pending().lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } + let result = handle_delete_contact(logic, connection, jid).await; + let _ = sender.send(result); } Command::UpdateContact(jid, contact_update, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let groups = Vec::from_iter( - contact_update - .groups - .into_iter() - .map(|group| stanza::roster::Group(Some(group))), - ); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: contact_update.name, - subscription: None, - groups, - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - logic.pending().lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } + let result = handle_update_contact(logic, connection, jid, contact_update).await; + let _ = sender.send(result); } Command::SetStatus(online, sender) => { - let result = logic.db().upsert_cached_status(online.clone()).await; - if let Err(e) = result { - logic - .handle_error(StatusError::Cache(e.into()).into()) - .await; - } - let result = connection - .write_handle() - .write(Stanza::Presence(online.into_stanza(None))) - .await - .map_err(|e| StatusError::Write(e)); - // .map_err(|e| StatusError::Write(e)); + let result = handle_set_status(logic, connection, online).await; let _ = sender.send(result); } // TODO: offline message queue Command::SendMessage(jid, body, sender) => { - let id = Uuid::new_v4(); - let message = Stanza::Message(stanza::client::message::Message { - from: Some(connection.jid().clone()), - id: Some(id.to_string()), - to: Some(jid.clone()), - // TODO: specify message type - r#type: stanza::client::message::MessageType::Chat, - // TODO: lang ? - lang: None, - subject: None, - body: Some(stanza::client::message::Body { - lang: None, - body: Some(body.body.clone()), - }), - thread: None, - delay: None, - }); - let _ = sender.send(Ok(())); - // let _ = sender.send(Ok(message.clone())); - let result = connection.write_handle().write(message).await; - match result { - Ok(_) => { - let mut message = Message { - id, - from: connection.jid().clone(), - body, - timestamp: Utc::now(), - }; - info!("send message {:?}", message); - if let Err(e) = logic - .db() - .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) - .await - { - logic - .handle_error(MessageSendError::MessageHistory(e.into()).into()) - .await; - } - // TODO: don't do this, have separate from from details - message.from = message.from.as_bare(); - let _ = logic - .update_sender() - .send(UpdateMessage::Message { to: jid, message }) - .await; - } - Err(_) => { - // let _ = sender.send(result); - } - } + let result = handle_send_message(logic, connection, jid, body).await; + let _ = sender.send(result); } Command::SendPresence(jid, presence, sender) => { - let mut presence: stanza::client::presence::Presence = presence.into(); - if let Some(jid) = jid { - presence.to = Some(jid); - }; - let result = connection - .write_handle() - .write(Stanza::Presence(presence)) - .await; - // .map_err(|e| StatusError::Write(e)); + let result = handle_send_presence(connection, jid, presence).await; let _ = sender.send(result); } } + Ok(()) } diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs index 94257aa..660da16 100644 --- a/filamento/src/logic/process_stanza.rs +++ b/filamento/src/logic/process_stanza.rs @@ -169,21 +169,14 @@ pub async fn recv_iq( } match iq.r#type { stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { - let send; - { - send = logic.pending().lock().await.remove(&iq.id); - } let from = iq .from .clone() .unwrap_or_else(|| connection.server().clone()); - if let Some(send) = send { - debug!("received iq result from {}", from); - let _ = send.send(Ok(Stanza::Iq(iq))); - Ok(None) - } else { - Err(IqError::NoMatchingId(iq.id)) - } + let id = iq.id.clone(); + debug!("received iq result with id `{}` from {}", id, from); + logic.pending().respond(Stanza::Iq(iq), id).await?; + Ok(None) } stanza::client::iq::IqType::Get => { let from = iq |