aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-03-27 19:09:35 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-03-27 19:09:35 +0000
commita367aca33fecc03270b5b9ad2a6a21281d760fd8 (patch)
tree695270823ee5d55f483875580c509fb2c300bd26
parent83a6aa0574190137b38331bd53795324139237cf (diff)
downloadluz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.tar.gz
luz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.tar.bz2
luz-a367aca33fecc03270b5b9ad2a6a21281d760fd8.zip
refactor(filamento): handle_online logic
-rw-r--r--filamento/examples/example.rs2
-rw-r--r--filamento/src/error.rs20
-rw-r--r--filamento/src/lib.rs7
-rw-r--r--filamento/src/logic/abort.rs5
-rw-r--r--filamento/src/logic/connect.rs3
-rw-r--r--filamento/src/logic/local_only.rs47
-rw-r--r--filamento/src/logic/mod.rs69
-rw-r--r--filamento/src/logic/online.rs1152
-rw-r--r--filamento/src/logic/process_stanza.rs15
9 files changed, 701 insertions, 619 deletions
diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs
index e36968d..b1ab6ce 100644
--- a/filamento/examples/example.rs
+++ b/filamento/examples/example.rs
@@ -20,7 +20,7 @@ async fn main() {
});
client.connect().await.unwrap();
- tokio::time::sleep(Duration::from_secs(5)).await;
+ tokio::time::sleep(Duration::from_secs(15)).await;
info!("sending message");
client
.send_message(
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
index 6277292..ccb4406 100644
--- a/filamento/src/error.rs
+++ b/filamento/src/error.rs
@@ -73,6 +73,8 @@ pub enum ConnectionJobError {
pub enum RosterError {
#[error("cache: {0}")]
Cache(#[from] DatabaseError),
+ #[error("iq response: {0}")]
+ IqResponse(#[from] RequestError),
#[error("stream write: {0}")]
Write(#[from] WriteError),
// TODO: display for stanza, to show as xml, same for read error types.
@@ -87,6 +89,20 @@ pub enum RosterError {
}
#[derive(Debug, Error, Clone)]
+pub enum RequestError {
+ #[error("sending request: {0}")]
+ Write(#[from] WriteError),
+ #[error("receiving expected response: {0}")]
+ Read(#[from] ReadError),
+}
+
+#[derive(Debug, Error, Clone)]
+pub enum ResponseError {
+ #[error("no matching id: {0}")]
+ NoMatchingId(String),
+}
+
+#[derive(Debug, Error, Clone)]
#[error("database error: {0}")]
pub struct DatabaseError(pub Arc<sqlx::Error>);
@@ -107,8 +123,8 @@ impl From<sqlx::Error> for DatabaseOpenError {
pub enum IqError {
#[error("writing response: {0}")]
WriteError(#[from] WriteError),
- #[error("no iq with id matching `{0}`")]
- NoMatchingId(String),
+ #[error("receiving response: `{0}`")]
+ ReceivedResponse(#[from] ResponseError),
#[error("incorrect addressee: {0}")]
IncorrectAddressee(jid::JID),
}
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
index 1e9207c..89da1a3 100644
--- a/filamento/src/lib.rs
+++ b/filamento/src/lib.rs
@@ -178,12 +178,7 @@ impl Client {
timeout: Duration::from_secs(10),
};
- let logic = ClientLogic::new(
- client.clone(),
- db,
- Arc::new(Mutex::new(HashMap::new())),
- update_send,
- );
+ let logic = ClientLogic::new(client.clone(), db, update_send);
let actor: CoreClient<ClientLogic> =
CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs
index 32c4823..df82655 100644
--- a/filamento/src/logic/abort.rs
+++ b/filamento/src/logic/abort.rs
@@ -3,8 +3,5 @@ use lampada::error::ReadError;
use super::ClientLogic;
pub async fn on_abort(logic: ClientLogic) {
- let mut iqs = logic.pending().lock().await;
- for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(ReadError::LostConnection));
- }
+ logic.pending().drain().await;
}
diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs
index d7b9fee..dc05448 100644
--- a/filamento/src/logic/connect.rs
+++ b/filamento/src/logic/connect.rs
@@ -19,7 +19,7 @@ pub async fn handle_connect(logic: ClientLogic, connection: Connected) {
.await;
debug!("sent roster req");
let roster = recv.await;
- debug!("got roster");
+ debug!("got roster: {:?}", roster);
match roster {
Ok(r) => match r {
Ok(roster) => {
@@ -42,6 +42,7 @@ pub async fn handle_connect(logic: ClientLogic, connection: Connected) {
)
.await;
let set_status = recv.await;
+ debug!("sent initial presence");
match set_status {
Ok(s) => match s {
Ok(()) => {
diff --git a/filamento/src/logic/local_only.rs b/filamento/src/logic/local_only.rs
new file mode 100644
index 0000000..3f6fe8d
--- /dev/null
+++ b/filamento/src/logic/local_only.rs
@@ -0,0 +1,47 @@
+use jid::JID;
+use uuid::Uuid;
+
+use crate::{
+ chat::{Chat, Message},
+ error::DatabaseError,
+ user::User,
+};
+
+use super::ClientLogic;
+
+pub async fn handle_get_chats(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> {
+ Ok(logic.db().read_chats().await?)
+}
+
+pub async fn handle_get_chats_ordered(logic: &ClientLogic) -> Result<Vec<Chat>, DatabaseError> {
+ Ok(logic.db().read_chats_ordered().await?)
+}
+
+pub async fn handle_get_chats_ordered_with_latest_messages(
+ logic: &ClientLogic,
+) -> Result<Vec<(Chat, Message)>, DatabaseError> {
+ Ok(logic.db().read_chats_ordered_with_latest_messages().await?)
+}
+
+pub async fn handle_get_chat(logic: &ClientLogic, jid: JID) -> Result<Chat, DatabaseError> {
+ Ok(logic.db().read_chat(jid).await?)
+}
+
+pub async fn handle_get_messages(
+ logic: &ClientLogic,
+ jid: JID,
+) -> Result<Vec<Message>, DatabaseError> {
+ Ok(logic.db().read_message_history(jid).await?)
+}
+
+pub async fn handle_delete_chat(logic: &ClientLogic, jid: JID) -> Result<(), DatabaseError> {
+ Ok(logic.db().delete_chat(jid).await?)
+}
+
+pub async fn handle_delete_messaage(logic: &ClientLogic, uuid: Uuid) -> Result<(), DatabaseError> {
+ Ok(logic.db().delete_message(uuid).await?)
+}
+
+pub async fn handle_get_user(logic: &ClientLogic, jid: JID) -> Result<User, DatabaseError> {
+ Ok(logic.db().read_user(jid).await?)
+}
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs
index 61d78bf..15c2d12 100644
--- a/filamento/src/logic/mod.rs
+++ b/filamento/src/logic/mod.rs
@@ -1,16 +1,21 @@
use std::{collections::HashMap, sync::Arc};
-use lampada::{Logic, error::ReadError};
+use lampada::{Connected, Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
use tracing::{error, info, warn};
-use crate::{Client, Command, UpdateMessage, db::Db, error::Error};
+use crate::{
+ Client, Command, UpdateMessage,
+ db::Db,
+ error::{Error, RequestError, ResponseError},
+};
mod abort;
mod connect;
mod connection_error;
mod disconnect;
+mod local_only;
mod offline;
mod online;
mod process_stanza;
@@ -19,20 +24,60 @@ mod process_stanza;
pub struct ClientLogic {
client: Client,
db: Db,
- pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ pending: Pending,
update_sender: mpsc::Sender<UpdateMessage>,
}
+#[derive(Clone)]
+pub struct Pending(Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>);
+
+impl Pending {
+ pub fn new() -> Self {
+ Self(Arc::new(Mutex::new(HashMap::new())))
+ }
+
+ pub async fn request(
+ &self,
+ connection: &Connected,
+ request: Stanza,
+ id: String,
+ ) -> Result<Stanza, RequestError> {
+ let (send, recv) = oneshot::channel();
+ {
+ self.0.lock().await.insert(id, send);
+ }
+ connection.write_handle().write(request).await?;
+ let stanza = recv.await.map_err(|e| ReadError::Actor(e.into()))??;
+ Ok(stanza)
+ }
+
+ pub async fn respond(&self, response: Stanza, id: String) -> Result<(), ResponseError> {
+ let send;
+ {
+ send = self.0.lock().await.remove(&id);
+ }
+ match send {
+ Some(send) => {
+ let _ = send.send(Ok(response));
+ Ok(())
+ }
+ None => Err(ResponseError::NoMatchingId(id)),
+ }
+ }
+
+ pub async fn drain(&self) {
+ let mut pending = self.0.lock().await;
+ for (_id, sender) in pending.drain() {
+ let _ = sender.send(Err(ReadError::LostConnection));
+ }
+ }
+}
+
impl ClientLogic {
- pub fn new(
- client: Client,
- db: Db,
- pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- update_sender: mpsc::Sender<UpdateMessage>,
- ) -> Self {
+ pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self {
Self {
db,
- pending,
+ pending: Pending::new(),
update_sender,
client,
}
@@ -46,8 +91,8 @@ impl ClientLogic {
&self.db
}
- pub fn pending(&self) -> &Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>> {
- &self.pending.as_ref()
+ pub fn pending(&self) -> &Pending {
+ &self.pending
}
pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
index 967ebb2..8bbeaa5 100644
--- a/filamento/src/logic/online.rs
+++ b/filamento/src/logic/online.rs
@@ -1,8 +1,12 @@
use chrono::Utc;
+use jid::JID;
use lampada::{Connected, WriteMessage, error::WriteError};
-use stanza::client::{
- Stanza,
- iq::{self, Iq, IqType},
+use stanza::{
+ client::{
+ Stanza,
+ iq::{self, Iq, IqType},
+ },
+ xep_0203::Delay,
};
use tokio::sync::oneshot;
use tracing::{debug, info};
@@ -10,641 +14,625 @@ use uuid::Uuid;
use crate::{
Command, UpdateMessage,
- chat::Message,
- error::{Error, MessageSendError, RosterError, StatusError},
- roster::Contact,
+ chat::{Body, Message},
+ error::{DatabaseError, Error, MessageSendError, RosterError, StatusError},
+ presence::{Online, Presence, PresenceType},
+ roster::{Contact, ContactUpdate},
};
-use super::ClientLogic;
+use super::{
+ ClientLogic,
+ local_only::{
+ handle_delete_chat, handle_delete_messaage, handle_get_chat, handle_get_chats,
+ handle_get_chats_ordered, handle_get_chats_ordered_with_latest_messages,
+ handle_get_messages, handle_get_user,
+ },
+};
pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
- match command {
- Command::GetRoster(result_sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let (send, iq_recv) = oneshot::channel();
- {
- logic.pending().lock().await.insert(iq_id.clone(), send);
- }
- let stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.to_string(),
- to: None,
- r#type: IqType::Get,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: Vec::new(),
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- let _ = connection
- .write_handle()
- .send(WriteMessage {
- stanza,
- respond_to: send,
- })
- .await;
- // TODO: timeout
- match recv.await {
- Ok(Ok(())) => info!("roster request sent"),
- Ok(Err(e)) => {
- // TODO: log errors if fail to send
- let _ = result_sender.send(Err(RosterError::Write(e.into())));
- return;
- }
- Err(e) => {
- let _ =
- result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
+ let result = handle_online_result(&logic, command, connection).await;
+ match result {
+ Ok(_) => {}
+ Err(e) => logic.handle_error(e).await,
+ }
+}
+
+pub async fn handle_get_roster(
+ logic: &ClientLogic,
+ connection: Connected,
+) -> Result<Vec<Contact>, RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, stanza, iq_id.clone())
+ .await?;
+ // TODO: timeout
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> = items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .handle_error(Error::Roster(RosterError::Cache(e.into())))
+ .await;
};
- // TODO: timeout
- match iq_recv.await {
- Ok(Ok(stanza)) => match stanza {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- let contacts: Vec<Contact> =
- items.into_iter().map(|item| item.into()).collect();
- if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
- logic
- .handle_error(Error::Roster(RosterError::Cache(e.into())))
- .await;
- };
- result_sender.send(Ok(contacts));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- result_sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- result_sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Ok(Err(e)) => {
- result_sender.send(Err(RosterError::Read(e)));
- return;
- }
- Err(e) => {
- result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
+ Ok(contacts)
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_add_contact(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+) -> Result<(), RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: None,
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, set_stanza, iq_id.clone())
+ .await?;
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => Ok(()),
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_buddy_request(connection: Connected, jid: JID) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_subscription_request(
+ connection: Connected,
+ jid: JID,
+) -> Result<(), WriteError> {
+ // TODO: i should probably have builders
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_accept_buddy_request(
+ connection: Connected,
+ jid: JID,
+) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_accept_subscription_request(
+ connection: Connected,
+ jid: JID,
+) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_unsubscribe_from_contact(
+ connection: Connected,
+ jid: JID,
+) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_unsubscribe_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_unfriend_contact(connection: Connected, jid: JID) -> Result<(), WriteError> {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ connection.write_handle().write(presence).await?;
+ Ok(())
+}
+
+pub async fn handle_delete_contact(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+) -> Result<(), RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: Some(stanza::roster::Subscription::Remove),
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let result = logic
+ .pending()
+ .request(&connection, set_stanza, iq_id.clone())
+ .await?;
+ match result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ // don't really need to check matching id as request() does this anyway
+ }) if id == iq_id && r#type == IqType::Result => Ok(()),
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
+ }
+ }
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_update_contact(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+ contact_update: ContactUpdate,
+) -> Result<(), RosterError> {
+ let iq_id = Uuid::new_v4().to_string();
+ let groups = Vec::from_iter(
+ contact_update
+ .groups
+ .into_iter()
+ .map(|group| stanza::roster::Group(Some(group))),
+ );
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: contact_update.name,
+ subscription: None,
+ groups,
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let response = logic
+ .pending()
+ .request(&connection, set_stanza, iq_id.clone())
+ .await?;
+ match response {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => Ok(()),
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ Err(RosterError::StanzaError(error.clone()))
+ } else {
+ Err(RosterError::UnexpectedStanza(s.clone()))
}
}
+ s => Err(RosterError::UnexpectedStanza(s)),
+ }
+}
+
+pub async fn handle_set_status(
+ logic: &ClientLogic,
+ connection: Connected,
+ online: Online,
+) -> Result<(), StatusError> {
+ logic
+ .db()
+ .upsert_cached_status(online.clone())
+ .await
+ .map_err(|e| DatabaseError(e.into()))?;
+ connection
+ .write_handle()
+ .write(Stanza::Presence(online.into_stanza(None)))
+ .await?;
+ Ok(())
+}
+
+pub async fn handle_send_message(
+ logic: &ClientLogic,
+ connection: Connected,
+ jid: JID,
+ body: Body,
+) -> Result<(), WriteError> {
+ let id = Uuid::new_v4();
+ let timestamp = Utc::now();
+ let message = Stanza::Message(stanza::client::message::Message {
+ from: Some(connection.jid().clone()),
+ id: Some(id.to_string()),
+ to: Some(jid.clone()),
+ // TODO: specify message type
+ r#type: stanza::client::message::MessageType::Chat,
+ // TODO: lang ?
+ lang: None,
+ subject: None,
+ body: Some(stanza::client::message::Body {
+ lang: None,
+ body: Some(body.body.clone()),
+ }),
+ thread: None,
+ // include delay to have a consistent timestamp between server and client
+ delay: Some(Delay {
+ from: None,
+ stamp: timestamp,
+ }),
+ });
+ connection.write_handle().write(message).await?;
+ let mut message = Message {
+ id,
+ from: connection.jid().clone(),
+ body,
+ timestamp,
+ };
+ info!("sent message: {:?}", message);
+ if let Err(e) = logic
+ .db()
+ .create_message_with_self_resource_and_chat(message.clone(), jid.clone())
+ .await
+ {
+ // TODO: should these really be handle_error or just the error macro?
+ logic
+ .handle_error(MessageSendError::MessageHistory(e.into()).into())
+ .await;
+ }
+ // TODO: don't do this, have separate from from details
+ message.from = message.from.as_bare();
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Message { to: jid, message })
+ .await;
+ Ok(())
+ // TODO: refactor this to send a sending updatemessage, then update or something like that
+}
+
+pub async fn handle_send_presence(
+ connection: Connected,
+ jid: Option<JID>,
+ presence: PresenceType,
+) -> Result<(), WriteError> {
+ let mut presence: stanza::client::presence::Presence = presence.into();
+ presence.to = jid;
+ connection
+ .write_handle()
+ .write(Stanza::Presence(presence))
+ .await?;
+ Ok(())
+}
+
+// TODO: could probably macro-ise?
+pub async fn handle_online_result(
+ logic: &ClientLogic,
+ command: Command,
+ connection: Connected,
+) -> Result<(), Error> {
+ match command {
+ Command::GetRoster(result_sender) => {
+ let roster = handle_get_roster(logic, connection).await;
+ let _ = result_sender.send(roster);
+ }
Command::GetChats(sender) => {
- let chats = logic.db().read_chats().await.map_err(|e| e.into());
- sender.send(chats);
+ let chats = handle_get_chats(logic).await;
+ let _ = sender.send(chats);
}
Command::GetChatsOrdered(sender) => {
- let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into());
- sender.send(chats);
+ let chats = handle_get_chats_ordered(logic).await;
+ let _ = sender.send(chats);
}
Command::GetChatsOrderedWithLatestMessages(sender) => {
- let chats = logic
- .db()
- .read_chats_ordered_with_latest_messages()
- .await
- .map_err(|e| e.into());
- sender.send(chats);
+ let chats = handle_get_chats_ordered_with_latest_messages(logic).await;
+ let _ = sender.send(chats);
}
Command::GetChat(jid, sender) => {
- let chats = logic.db().read_chat(jid).await.map_err(|e| e.into());
- sender.send(chats);
+ let chat = handle_get_chat(logic, jid).await;
+ let _ = sender.send(chat);
}
Command::GetMessages(jid, sender) => {
- let messages = logic
- .db()
- .read_message_history(jid)
- .await
- .map_err(|e| e.into());
- sender.send(messages);
+ let messages = handle_get_messages(logic, jid).await;
+ let _ = sender.send(messages);
}
Command::DeleteChat(jid, sender) => {
- let result = logic.db().delete_chat(jid).await.map_err(|e| e.into());
- sender.send(result);
+ let result = handle_delete_chat(logic, jid).await;
+ let _ = sender.send(result);
}
Command::DeleteMessage(uuid, sender) => {
- let result = logic.db().delete_message(uuid).await.map_err(|e| e.into());
- sender.send(result);
+ let result = handle_delete_messaage(logic, uuid).await;
+ let _ = sender.send(result);
}
Command::GetUser(jid, sender) => {
- let user = logic.db().read_user(jid).await.map_err(|e| e.into());
- sender.send(user);
+ let user = handle_get_user(logic, jid).await;
+ let _ = sender.send(user);
}
// TODO: offline queue to modify roster
Command::AddContact(jid, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: None,
- subscription: None,
- groups: Vec::new(),
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- logic.pending().lock().await.insert(iq_id.clone(), send);
- }
- // TODO: write_handle send helper function
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
+ let result = handle_add_contact(logic, connection, jid).await;
+ let _ = sender.send(result);
}
Command::BuddyRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
+ let result = handle_buddy_request(connection, jid).await;
+ let _ = sender.send(result);
}
Command::SubscriptionRequest(jid, sender) => {
- // TODO: i should probably have builders
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
+ let result = handle_subscription_request(connection, jid).await;
let _ = sender.send(result);
}
Command::AcceptBuddyRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
+ let result = handle_accept_buddy_request(connection, jid).await;
+ let _ = sender.send(result);
}
Command::AcceptSubscriptionRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
+ let result = handle_accept_subscription_request(connection, jid).await;
let _ = sender.send(result);
}
Command::UnsubscribeFromContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
+ let result = handle_unsubscribe_from_contact(connection, jid).await;
let _ = sender.send(result);
}
Command::UnsubscribeContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
+ let result = handle_unsubscribe_contact(connection, jid).await;
let _ = sender.send(result);
}
Command::UnfriendContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
+ let result = handle_unfriend_contact(connection, jid).await;
+ let _ = sender.send(result);
}
Command::DeleteContact(jid, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: None,
- subscription: Some(stanza::roster::Subscription::Remove),
- groups: Vec::new(),
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- logic.pending().lock().await.insert(iq_id.clone(), send);
- }
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
+ let result = handle_delete_contact(logic, connection, jid).await;
+ let _ = sender.send(result);
}
Command::UpdateContact(jid, contact_update, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let groups = Vec::from_iter(
- contact_update
- .groups
- .into_iter()
- .map(|group| stanza::roster::Group(Some(group))),
- );
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: contact_update.name,
- subscription: None,
- groups,
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- logic.pending().lock().await.insert(iq_id.clone(), send);
- }
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
+ let result = handle_update_contact(logic, connection, jid, contact_update).await;
+ let _ = sender.send(result);
}
Command::SetStatus(online, sender) => {
- let result = logic.db().upsert_cached_status(online.clone()).await;
- if let Err(e) = result {
- logic
- .handle_error(StatusError::Cache(e.into()).into())
- .await;
- }
- let result = connection
- .write_handle()
- .write(Stanza::Presence(online.into_stanza(None)))
- .await
- .map_err(|e| StatusError::Write(e));
- // .map_err(|e| StatusError::Write(e));
+ let result = handle_set_status(logic, connection, online).await;
let _ = sender.send(result);
}
// TODO: offline message queue
Command::SendMessage(jid, body, sender) => {
- let id = Uuid::new_v4();
- let message = Stanza::Message(stanza::client::message::Message {
- from: Some(connection.jid().clone()),
- id: Some(id.to_string()),
- to: Some(jid.clone()),
- // TODO: specify message type
- r#type: stanza::client::message::MessageType::Chat,
- // TODO: lang ?
- lang: None,
- subject: None,
- body: Some(stanza::client::message::Body {
- lang: None,
- body: Some(body.body.clone()),
- }),
- thread: None,
- delay: None,
- });
- let _ = sender.send(Ok(()));
- // let _ = sender.send(Ok(message.clone()));
- let result = connection.write_handle().write(message).await;
- match result {
- Ok(_) => {
- let mut message = Message {
- id,
- from: connection.jid().clone(),
- body,
- timestamp: Utc::now(),
- };
- info!("send message {:?}", message);
- if let Err(e) = logic
- .db()
- .create_message_with_self_resource_and_chat(message.clone(), jid.clone())
- .await
- {
- logic
- .handle_error(MessageSendError::MessageHistory(e.into()).into())
- .await;
- }
- // TODO: don't do this, have separate from from details
- message.from = message.from.as_bare();
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Message { to: jid, message })
- .await;
- }
- Err(_) => {
- // let _ = sender.send(result);
- }
- }
+ let result = handle_send_message(logic, connection, jid, body).await;
+ let _ = sender.send(result);
}
Command::SendPresence(jid, presence, sender) => {
- let mut presence: stanza::client::presence::Presence = presence.into();
- if let Some(jid) = jid {
- presence.to = Some(jid);
- };
- let result = connection
- .write_handle()
- .write(Stanza::Presence(presence))
- .await;
- // .map_err(|e| StatusError::Write(e));
+ let result = handle_send_presence(connection, jid, presence).await;
let _ = sender.send(result);
}
}
+ Ok(())
}
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 94257aa..660da16 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -169,21 +169,14 @@ pub async fn recv_iq(
}
match iq.r#type {
stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
- let send;
- {
- send = logic.pending().lock().await.remove(&iq.id);
- }
let from = iq
.from
.clone()
.unwrap_or_else(|| connection.server().clone());
- if let Some(send) = send {
- debug!("received iq result from {}", from);
- let _ = send.send(Ok(Stanza::Iq(iq)));
- Ok(None)
- } else {
- Err(IqError::NoMatchingId(iq.id))
- }
+ let id = iq.id.clone();
+ debug!("received iq result with id `{}` from {}", id, from);
+ logic.pending().respond(Stanza::Iq(iq), id).await?;
+ Ok(None)
}
stanza::client::iq::IqType::Get => {
let from = iq