aboutsummaryrefslogtreecommitdiffstats
path: root/filamento
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 15:29:11 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 15:29:11 +0000
commitbf677e1f9ce07e2fa8971c15b9a082cddbb40dec (patch)
treed8a6c8bec63c774322b207af43ad573d730ee1a2 /filamento
parent2211f324782cdc617b4b5ecd071178e372539fe4 (diff)
downloadluz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.tar.gz
luz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.tar.bz2
luz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.zip
refactor(filament): split logic into different files
Diffstat (limited to 'filamento')
-rw-r--r--filamento/.gitignore1
-rw-r--r--filamento/src/lib.rs1124
-rw-r--r--filamento/src/logic/abort.rs10
-rw-r--r--filamento/src/logic/connect.rs91
-rw-r--r--filamento/src/logic/connection_error.rs12
-rw-r--r--filamento/src/logic/disconnect.rs18
-rw-r--r--filamento/src/logic/mod.rs90
-rw-r--r--filamento/src/logic/offline.rs110
-rw-r--r--filamento/src/logic/online.rs661
-rw-r--r--filamento/src/logic/process_stanza.rs264
10 files changed, 1260 insertions, 1121 deletions
diff --git a/filamento/.gitignore b/filamento/.gitignore
new file mode 100644
index 0000000..8bb4037
--- /dev/null
+++ b/filamento/.gitignore
@@ -0,0 +1 @@
+filamento.db
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
index db59a67..030dc43 100644
--- a/filamento/src/lib.rs
+++ b/filamento/src/lib.rs
@@ -19,6 +19,7 @@ use lampada::{
Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage,
error::{ActorError, CommandError, ConnectionError, ReadError, WriteError},
};
+use logic::ClientLogic;
use presence::{Offline, Online, Presence, PresenceType, Show};
use roster::{Contact, ContactUpdate};
use stanza::client::{
@@ -36,6 +37,7 @@ use uuid::Uuid;
pub mod chat;
pub mod db;
pub mod error;
+mod logic;
pub mod presence;
pub mod roster;
pub mod user;
@@ -145,11 +147,7 @@ impl Client {
let (_sup_send, sup_recv) = oneshot::channel();
let sup_recv = sup_recv.fuse();
- let logic = ClientLogic {
- db,
- pending: Arc::new(Mutex::new(HashMap::new())),
- update_sender: update_send,
- };
+ let logic = ClientLogic::new(db, Arc::new(Mutex::new(HashMap::new())), update_send);
let actor: CoreClient<ClientLogic> =
CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
@@ -450,1122 +448,6 @@ impl Client {
}
}
-#[derive(Clone)]
-pub struct ClientLogic {
- db: Db,
- pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- update_sender: mpsc::Sender<UpdateMessage>,
-}
-
-impl Logic for ClientLogic {
- type Cmd = Command;
-
- async fn handle_connect(self, connection: Connected) {
- let (send, recv) = oneshot::channel();
- debug!("getting roster");
- self.clone()
- .handle_online(Command::GetRoster(send), connection.clone())
- .await;
- debug!("sent roster req");
- let roster = recv.await;
- debug!("got roster");
- match roster {
- Ok(r) => match r {
- Ok(roster) => {
- let online = self.db.read_cached_status().await;
- let online = match online {
- Ok(online) => online,
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(
- ConnectionJobError::StatusCacheError(e.into()),
- )))
- .await;
- Online::default()
- }
- };
- let (send, recv) = oneshot::channel();
- self.clone()
- .handle_online(
- Command::SendPresence(None, PresenceType::Online(online.clone()), send),
- connection,
- )
- .await;
- let set_status = recv.await;
- match set_status {
- Ok(s) => match s {
- Ok(()) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Online(online, roster))
- .await;
- }
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(e.into())))
- .await;
- }
- },
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(
- ConnectionJobError::SendPresence(WriteError::Actor(e.into())),
- )))
- .await;
- }
- }
- }
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(e.into())))
- .await;
- }
- },
- Err(e) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Connecting(
- ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor(
- e.into(),
- ))),
- )))
- .await;
- }
- }
- }
-
- async fn handle_disconnect(self, connection: Connected) {
- // TODO: be able to set offline status message
- let offline_presence: stanza::client::presence::Presence =
- Offline::default().into_stanza(None);
- let stanza = Stanza::Presence(offline_presence);
- // TODO: timeout and error check
- connection.write_handle().write(stanza).await;
- let _ = self
- .update_sender
- .send(UpdateMessage::Offline(Offline::default()))
- .await;
- }
-
- async fn handle_stanza(
- self,
- stanza: Stanza,
- connection: Connected,
- supervisor: SupervisorSender,
- ) {
- match stanza {
- Stanza::Message(stanza_message) => {
- if let Some(mut from) = stanza_message.from {
- // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
- let timestamp = stanza_message
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- // TODO: group chat messages
- let mut message = Message {
- id: stanza_message
- .id
- // TODO: proper id storage
- .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
- .unwrap_or_else(|| Uuid::new_v4()),
- from: from.clone(),
- timestamp,
- body: Body {
- // TODO: should this be an option?
- body: stanza_message
- .body
- .map(|body| body.body)
- .unwrap_or_default()
- .unwrap_or_default(),
- },
- };
- // TODO: can this be more efficient?
- let result = self
- .db
- .create_message_with_user_resource_and_chat(message.clone(), from.clone())
- .await;
- if let Err(e) = result {
- tracing::error!("messagecreate");
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MessageHistory(e.into()),
- )))
- .await;
- }
- message.from = message.from.as_bare();
- from = from.as_bare();
- let _ = self
- .update_sender
- .send(UpdateMessage::Message { to: from, message })
- .await;
- } else {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MissingFrom,
- )))
- .await;
- }
- }
- Stanza::Presence(presence) => {
- if let Some(from) = presence.from {
- match presence.r#type {
- Some(r#type) => match r#type {
- // error processing a presence from somebody
- stanza::client::presence::PresenceType::Error => {
- // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Presence(
- // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
- PresenceError::StanzaError(
- presence
- .errors
- .first()
- .cloned()
- .expect("error MUST have error"),
- ),
- )))
- .await;
- }
- // should not happen (error to server)
- stanza::client::presence::PresenceType::Probe => {
- // TODO: should probably write an error and restart stream
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::Unsupported,
- )))
- .await;
- }
- stanza::client::presence::PresenceType::Subscribe => {
- // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
- let _ = self
- .update_sender
- .send(UpdateMessage::SubscriptionRequest(from))
- .await;
- }
- stanza::client::presence::PresenceType::Unavailable => {
- let offline = Offline {
- status: presence.status.map(|status| status.status.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = self
- .update_sender
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Offline(offline),
- },
- })
- .await;
- }
- // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
- stanza::client::presence::PresenceType::Subscribed => {}
- stanza::client::presence::PresenceType::Unsubscribe => {}
- stanza::client::presence::PresenceType::Unsubscribed => {}
- },
- None => {
- let online = Online {
- show: presence.show.map(|show| match show {
- stanza::client::presence::Show::Away => Show::Away,
- stanza::client::presence::Show::Chat => Show::Chat,
- stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
- stanza::client::presence::Show::Xa => Show::ExtendedAway,
- }),
- status: presence.status.map(|status| status.status.0),
- priority: presence.priority.map(|priority| priority.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = self
- .update_sender
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Online(online),
- },
- })
- .await;
- }
- }
- } else {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::MissingFrom,
- )))
- .await;
- }
- }
- Stanza::Iq(iq) => match iq.r#type {
- stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
- let send;
- {
- send = self.pending.lock().await.remove(&iq.id);
- }
- if let Some(send) = send {
- send.send(Ok(Stanza::Iq(iq)));
- } else {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
- iq.id,
- ))))
- .await;
- }
- }
- // TODO: send unsupported to server
- // TODO: proper errors i am so tired please
- stanza::client::iq::IqType::Get => {}
- stanza::client::iq::IqType::Set => {
- if let Some(query) = iq.query {
- match query {
- stanza::client::iq::Query::Roster(mut query) => {
- // TODO: there should only be one
- if let Some(item) = query.items.pop() {
- match item.subscription {
- Some(stanza::roster::Subscription::Remove) => {
- self.db.delete_contact(item.jid.clone()).await;
- self.update_sender
- .send(UpdateMessage::RosterDelete(item.jid))
- .await;
- // TODO: send result
- }
- _ => {
- let contact: Contact = item.into();
- if let Err(e) =
- self.db.upsert_contact(contact.clone()).await
- {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Roster(
- RosterError::Cache(e.into()),
- )))
- .await;
- }
- let _ = self
- .update_sender
- .send(UpdateMessage::RosterUpdate(contact))
- .await;
- // TODO: send result
- // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
- // from: ,
- // id: todo!(),
- // to: todo!(),
- // r#type: todo!(),
- // lang: todo!(),
- // query: todo!(),
- // errors: todo!(),
- // }));
- }
- }
- }
- }
- // TODO: send unsupported to server
- _ => {}
- }
- } else {
- // TODO: send error (unsupported) to server
- }
- }
- },
- Stanza::Error(error) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::Stream(error)))
- .await;
- // TODO: reconnect
- }
- Stanza::OtherContent(content) => {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::UnrecognizedContent));
- // TODO: send error to write_thread
- }
- }
- }
-
- async fn handle_online(self, command: Command, connection: Connected) {
- match command {
- Command::GetRoster(result_sender) => {
- // TODO: jid resource should probably be stored within the connection
- debug!("before client_jid lock");
- debug!("after client_jid lock");
- let iq_id = Uuid::new_v4().to_string();
- let (send, iq_recv) = oneshot::channel();
- {
- self.pending.lock().await.insert(iq_id.clone(), send);
- }
- let stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.to_string(),
- to: None,
- r#type: IqType::Get,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: Vec::new(),
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- let _ = connection
- .write_handle()
- .send(WriteMessage {
- stanza,
- respond_to: send,
- })
- .await;
- // TODO: timeout
- match recv.await {
- Ok(Ok(())) => info!("roster request sent"),
- Ok(Err(e)) => {
- // TODO: log errors if fail to send
- let _ = result_sender.send(Err(RosterError::Write(e.into())));
- return;
- }
- Err(e) => {
- let _ = result_sender
- .send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- };
- // TODO: timeout
- match iq_recv.await {
- Ok(Ok(stanza)) => match stanza {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- let contacts: Vec<Contact> =
- items.into_iter().map(|item| item.into()).collect();
- if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await {
- self.update_sender
- .send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
- e.into(),
- ))))
- .await;
- };
- result_sender.send(Ok(contacts));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- result_sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- result_sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Ok(Err(e)) => {
- result_sender.send(Err(RosterError::Read(e)));
- return;
- }
- Err(e) => {
- result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
- }
- Command::GetChats(sender) => {
- let chats = self.db.read_chats().await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChatsOrdered(sender) => {
- let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChatsOrderedWithLatestMessages(sender) => {
- let chats = self
- .db
- .read_chats_ordered_with_latest_messages()
- .await
- .map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChat(jid, sender) => {
- let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetMessages(jid, sender) => {
- let messages = self
- .db
- .read_message_history(jid)
- .await
- .map_err(|e| e.into());
- sender.send(messages);
- }
- Command::DeleteChat(jid, sender) => {
- let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
- sender.send(result);
- }
- Command::DeleteMessage(uuid, sender) => {
- let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
- sender.send(result);
- }
- Command::GetUser(jid, sender) => {
- let user = self.db.read_user(jid).await.map_err(|e| e.into());
- sender.send(user);
- }
- // TODO: offline queue to modify roster
- Command::AddContact(jid, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: None,
- subscription: None,
- groups: Vec::new(),
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- self.pending.lock().await.insert(iq_id.clone(), send);
- }
- // TODO: write_handle send helper function
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
- }
- Command::BuddyRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
- }
- Command::SubscriptionRequest(jid, sender) => {
- // TODO: i should probably have builders
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- Command::AcceptBuddyRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Subscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
- }
- Command::AcceptSubscriptionRequest(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Subscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- Command::UnsubscribeFromContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- Command::UnsubscribeContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- Command::UnfriendContact(jid, sender) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid.clone()),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- match result {
- Err(_) => {
- let _ = sender.send(result);
- }
- Ok(()) => {
- let presence = Stanza::Presence(stanza::client::presence::Presence {
- from: None,
- id: None,
- to: Some(jid),
- r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
- lang: None,
- show: None,
- status: None,
- priority: None,
- errors: Vec::new(),
- delay: None,
- });
- let result = connection.write_handle().write(presence).await;
- let _ = sender.send(result);
- }
- }
- }
- Command::DeleteContact(jid, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: None,
- subscription: Some(stanza::roster::Subscription::Remove),
- groups: Vec::new(),
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- self.pending.lock().await.insert(iq_id.clone(), send);
- }
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
- }
- Command::UpdateContact(jid, contact_update, sender) => {
- let iq_id = Uuid::new_v4().to_string();
- let groups = Vec::from_iter(
- contact_update
- .groups
- .into_iter()
- .map(|group| stanza::roster::Group(Some(group))),
- );
- let set_stanza = Stanza::Iq(Iq {
- from: Some(connection.jid().clone()),
- id: iq_id.clone(),
- to: None,
- r#type: IqType::Set,
- lang: None,
- query: Some(iq::Query::Roster(stanza::roster::Query {
- ver: None,
- items: vec![stanza::roster::Item {
- approved: None,
- ask: false,
- jid,
- name: contact_update.name,
- subscription: None,
- groups,
- }],
- })),
- errors: Vec::new(),
- });
- let (send, recv) = oneshot::channel();
- {
- self.pending.lock().await.insert(iq_id.clone(), send);
- }
- let result = connection.write_handle().write(set_stanza).await;
- if let Err(e) = result {
- sender.send(Err(RosterError::Write(e)));
- return;
- }
- let iq_result = recv.await;
- match iq_result {
- Ok(i) => match i {
- Ok(iq_result) => match iq_result {
- Stanza::Iq(Iq {
- from: _,
- id,
- to: _,
- r#type,
- lang: _,
- query: _,
- errors: _,
- }) if id == iq_id && r#type == IqType::Result => {
- sender.send(Ok(()));
- return;
- }
- ref s @ Stanza::Iq(Iq {
- from: _,
- ref id,
- to: _,
- r#type,
- lang: _,
- query: _,
- ref errors,
- }) if *id == iq_id && r#type == IqType::Error => {
- if let Some(error) = errors.first() {
- sender.send(Err(RosterError::StanzaError(error.clone())));
- } else {
- sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
- }
- return;
- }
- s => {
- sender.send(Err(RosterError::UnexpectedStanza(s)));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(e.into()));
- return;
- }
- },
- Err(e) => {
- sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
- return;
- }
- }
- }
- Command::SetStatus(online, sender) => {
- let result = self.db.upsert_cached_status(online.clone()).await;
- if let Err(e) = result {
- let _ = self
- .update_sender
- .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
- e.into(),
- ))))
- .await;
- }
- let result = connection
- .write_handle()
- .write(Stanza::Presence(online.into_stanza(None)))
- .await
- .map_err(|e| StatusError::Write(e));
- // .map_err(|e| StatusError::Write(e));
- let _ = sender.send(result);
- }
- // TODO: offline message queue
- Command::SendMessage(jid, body, sender) => {
- let id = Uuid::new_v4();
- let message = Stanza::Message(stanza::client::message::Message {
- from: Some(connection.jid().clone()),
- id: Some(id.to_string()),
- to: Some(jid.clone()),
- // TODO: specify message type
- r#type: stanza::client::message::MessageType::Chat,
- // TODO: lang ?
- lang: None,
- subject: None,
- body: Some(stanza::client::message::Body {
- lang: None,
- body: Some(body.body.clone()),
- }),
- thread: None,
- delay: None,
- });
- let _ = sender.send(Ok(()));
- // let _ = sender.send(Ok(message.clone()));
- let result = connection.write_handle().write(message).await;
- match result {
- Ok(_) => {
- let mut message = Message {
- id,
- from: connection.jid().clone(),
- body,
- timestamp: Utc::now(),
- };
- info!("send message {:?}", message);
- if let Err(e) = self
- .db
- .create_message_with_self_resource_and_chat(
- message.clone(),
- jid.clone(),
- )
- .await
- .map_err(|e| e.into())
- {
- tracing::error!("{}", e);
- let _ =
- self.update_sender
- .send(UpdateMessage::Error(Error::MessageSend(
- error::MessageSendError::MessageHistory(e),
- )));
- }
- // TODO: don't do this, have separate from from details
- message.from = message.from.as_bare();
- let _ = self
- .update_sender
- .send(UpdateMessage::Message { to: jid, message })
- .await;
- }
- Err(_) => {
- // let _ = sender.send(result);
- }
- }
- }
- Command::SendPresence(jid, presence, sender) => {
- let mut presence: stanza::client::presence::Presence = presence.into();
- if let Some(jid) = jid {
- presence.to = Some(jid);
- };
- let result = connection
- .write_handle()
- .write(Stanza::Presence(presence))
- .await;
- // .map_err(|e| StatusError::Write(e));
- let _ = sender.send(result);
- }
- }
- }
-
- async fn handle_offline(self, command: Command) {
- match command {
- Command::GetRoster(sender) => {
- let roster = self.db.read_cached_roster().await;
- match roster {
- Ok(roster) => {
- let _ = sender.send(Ok(roster));
- }
- Err(e) => {
- let _ = sender.send(Err(RosterError::Cache(e.into())));
- }
- }
- }
- Command::GetChats(sender) => {
- let chats = self.db.read_chats().await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChatsOrdered(sender) => {
- let chats = self.db.read_chats_ordered().await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChatsOrderedWithLatestMessages(sender) => {
- let chats = self
- .db
- .read_chats_ordered_with_latest_messages()
- .await
- .map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetChat(jid, sender) => {
- let chats = self.db.read_chat(jid).await.map_err(|e| e.into());
- sender.send(chats);
- }
- Command::GetMessages(jid, sender) => {
- let messages = self
- .db
- .read_message_history(jid)
- .await
- .map_err(|e| e.into());
- sender.send(messages);
- }
- Command::DeleteChat(jid, sender) => {
- let result = self.db.delete_chat(jid).await.map_err(|e| e.into());
- sender.send(result);
- }
- Command::DeleteMessage(uuid, sender) => {
- let result = self.db.delete_message(uuid).await.map_err(|e| e.into());
- sender.send(result);
- }
- Command::GetUser(jid, sender) => {
- let user = self.db.read_user(jid).await.map_err(|e| e.into());
- sender.send(user);
- }
- // TODO: offline queue to modify roster
- Command::AddContact(_jid, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- Command::BuddyRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::SubscriptionRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::AcceptBuddyRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::AcceptSubscriptionRequest(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::UnsubscribeFromContact(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::UnsubscribeContact(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::UnfriendContact(_jid, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::DeleteContact(_jid, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- Command::UpdateContact(_jid, _contact_update, sender) => {
- sender.send(Err(RosterError::Write(WriteError::Disconnected)));
- }
- Command::SetStatus(online, sender) => {
- let result = self
- .db
- .upsert_cached_status(online)
- .await
- .map_err(|e| StatusError::Cache(e.into()));
- sender.send(result);
- }
- // TODO: offline message queue
- Command::SendMessage(_jid, _body, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- Command::SendPresence(_jid, _presence, sender) => {
- sender.send(Err(WriteError::Disconnected));
- }
- }
- }
- // pub async fn handle_stream_error(self, error) {}
- // stanza errors (recoverable)
- // pub async fn handle_error(self, error: Error) {}
- // when it aborts, must clear iq map no matter what
- async fn on_abort(self) {
- let mut iqs = self.pending.lock().await;
- for (_id, sender) in iqs.drain() {
- let _ = sender.send(Err(ReadError::LostConnection));
- }
- }
-
- async fn handle_connection_error(self, error: ConnectionError) {
- self.update_sender
- .send(UpdateMessage::Error(
- ConnectionError::AlreadyConnected.into(),
- ))
- .await;
- }
-}
-
impl From<Command> for CoreClientCommand<Command> {
fn from(value: Command) -> Self {
CoreClientCommand::Command(value)
diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs
new file mode 100644
index 0000000..32c4823
--- /dev/null
+++ b/filamento/src/logic/abort.rs
@@ -0,0 +1,10 @@
+use lampada::error::ReadError;
+
+use super::ClientLogic;
+
+pub async fn on_abort(logic: ClientLogic) {
+ let mut iqs = logic.pending().lock().await;
+ for (_id, sender) in iqs.drain() {
+ let _ = sender.send(Err(ReadError::LostConnection));
+ }
+}
diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs
new file mode 100644
index 0000000..4dc789e
--- /dev/null
+++ b/filamento/src/logic/connect.rs
@@ -0,0 +1,91 @@
+use lampada::{Connected, Logic, error::WriteError};
+use tokio::sync::oneshot;
+use tracing::debug;
+
+use crate::{
+ Command, UpdateMessage,
+ error::{ConnectionJobError, Error, RosterError},
+ presence::{Online, PresenceType},
+};
+
+use super::ClientLogic;
+
+pub async fn handle_connect(logic: ClientLogic, connection: Connected) {
+ let (send, recv) = oneshot::channel();
+ debug!("getting roster");
+ logic
+ .clone()
+ .handle_online(Command::GetRoster(send), connection.clone())
+ .await;
+ debug!("sent roster req");
+ let roster = recv.await;
+ debug!("got roster");
+ match roster {
+ Ok(r) => match r {
+ Ok(roster) => {
+ let online = logic.db().read_cached_status().await;
+ let online = match online {
+ Ok(online) => online,
+ Err(e) => {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::StatusCacheError(e.into()),
+ )))
+ .await;
+ Online::default()
+ }
+ };
+ let (send, recv) = oneshot::channel();
+ logic
+ .clone()
+ .handle_online(
+ Command::SendPresence(None, PresenceType::Online(online.clone()), send),
+ connection,
+ )
+ .await;
+ let set_status = recv.await;
+ match set_status {
+ Ok(s) => match s {
+ Ok(()) => {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Online(online, roster))
+ .await;
+ }
+ Err(e) => {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::SendPresence(WriteError::Actor(e.into())),
+ )))
+ .await;
+ }
+ }
+ }
+ Err(e) => {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Connecting(e.into())))
+ .await;
+ }
+ },
+ Err(e) => {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Connecting(
+ ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor(
+ e.into(),
+ ))),
+ )))
+ .await;
+ }
+ }
+}
diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs
new file mode 100644
index 0000000..ac9e931
--- /dev/null
+++ b/filamento/src/logic/connection_error.rs
@@ -0,0 +1,12 @@
+use lampada::error::ConnectionError;
+
+use crate::UpdateMessage;
+
+use super::ClientLogic;
+
+pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) {
+ logic
+ .update_sender()
+ .send(UpdateMessage::Error(error.into()))
+ .await;
+}
diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs
new file mode 100644
index 0000000..241c3e6
--- /dev/null
+++ b/filamento/src/logic/disconnect.rs
@@ -0,0 +1,18 @@
+use lampada::Connected;
+use stanza::client::Stanza;
+
+use crate::{UpdateMessage, presence::Offline};
+
+use super::ClientLogic;
+
+pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) {
+ // TODO: be able to set offline status message
+ let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None);
+ let stanza = Stanza::Presence(offline_presence);
+ // TODO: timeout and error check
+ connection.write_handle().write(stanza).await;
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Offline(Offline::default()))
+ .await;
+}
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs
new file mode 100644
index 0000000..638f682
--- /dev/null
+++ b/filamento/src/logic/mod.rs
@@ -0,0 +1,90 @@
+use std::{collections::HashMap, sync::Arc};
+
+use lampada::{Logic, error::ReadError};
+use stanza::client::Stanza;
+use tokio::sync::{Mutex, mpsc, oneshot};
+
+use crate::{Command, UpdateMessage, db::Db};
+
+mod abort;
+mod connect;
+mod connection_error;
+mod disconnect;
+mod offline;
+mod online;
+mod process_stanza;
+
+#[derive(Clone)]
+pub struct ClientLogic {
+ db: Db,
+ pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ update_sender: mpsc::Sender<UpdateMessage>,
+}
+
+impl ClientLogic {
+ pub fn new(
+ db: Db,
+ pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ update_sender: mpsc::Sender<UpdateMessage>,
+ ) -> Self {
+ Self {
+ db,
+ pending,
+ update_sender,
+ }
+ }
+
+ pub fn db(&self) -> &Db {
+ &self.db
+ }
+
+ pub fn pending(&self) -> &Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>> {
+ &self.pending.as_ref()
+ }
+
+ pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {
+ &self.update_sender
+ }
+}
+
+impl Logic for ClientLogic {
+ type Cmd = Command;
+
+ // pub async fn handle_stream_error(self, error) {}
+ // stanza errors (recoverable)
+ // pub async fn handle_error(self, error: Error) {}
+ // when it aborts, must clear iq map no matter what
+
+ async fn handle_connect(self, connection: lampada::Connected) {
+ connect::handle_connect(self, connection).await;
+ }
+
+ async fn handle_disconnect(self, connection: lampada::Connected) {
+ disconnect::handle_disconnect(self, connection).await;
+ }
+
+ async fn handle_stanza(
+ self,
+ stanza: ::stanza::client::Stanza,
+ connection: lampada::Connected,
+ supervisor: lampada::SupervisorSender,
+ ) {
+ process_stanza::handle_stanza(self, stanza, connection, supervisor).await;
+ }
+
+ async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) {
+ online::handle_online(self, command, connection).await;
+ }
+
+ async fn handle_offline(self, command: Self::Cmd) {
+ offline::handle_offline(self, command).await;
+ }
+
+ async fn on_abort(self) {
+ abort::on_abort(self).await;
+ }
+
+ async fn handle_connection_error(self, error: lampada::error::ConnectionError) {
+ connection_error::handle_connection_error(self, error).await;
+ }
+}
diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs
new file mode 100644
index 0000000..17a60f3
--- /dev/null
+++ b/filamento/src/logic/offline.rs
@@ -0,0 +1,110 @@
+use lampada::error::WriteError;
+
+use crate::{
+ Command,
+ error::{RosterError, StatusError},
+};
+
+use super::ClientLogic;
+
+pub async fn handle_offline(logic: ClientLogic, command: Command) {
+ match command {
+ Command::GetRoster(sender) => {
+ let roster = logic.db().read_cached_roster().await;
+ match roster {
+ Ok(roster) => {
+ let _ = sender.send(Ok(roster));
+ }
+ Err(e) => {
+ let _ = sender.send(Err(RosterError::Cache(e.into())));
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = logic.db().read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = logic
+ .db()
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = logic.db().read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = logic
+ .db()
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = logic.db().delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = logic.db().delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = logic.db().read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::BuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptBuddyRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::AcceptSubscriptionRequest(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeFromContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnsubscribeContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::UnfriendContact(_jid, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::DeleteContact(_jid, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::UpdateContact(_jid, _contact_update, sender) => {
+ sender.send(Err(RosterError::Write(WriteError::Disconnected)));
+ }
+ Command::SetStatus(online, sender) => {
+ let result = logic
+ .db()
+ .upsert_cached_status(online)
+ .await
+ .map_err(|e| StatusError::Cache(e.into()));
+ sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(_jid, _body, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ Command::SendPresence(_jid, _presence, sender) => {
+ sender.send(Err(WriteError::Disconnected));
+ }
+ }
+}
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs
new file mode 100644
index 0000000..e8cbb33
--- /dev/null
+++ b/filamento/src/logic/online.rs
@@ -0,0 +1,661 @@
+use chrono::Utc;
+use lampada::{Connected, WriteMessage, error::WriteError};
+use stanza::client::{
+ Stanza,
+ iq::{self, Iq, IqType},
+};
+use tokio::sync::oneshot;
+use tracing::{debug, info};
+use uuid::Uuid;
+
+use crate::{
+ Command, UpdateMessage,
+ chat::Message,
+ error::{Error, MessageSendError, RosterError, StatusError},
+ roster::Contact,
+};
+
+use super::ClientLogic;
+
+pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) {
+ match command {
+ Command::GetRoster(result_sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let (send, iq_recv) = oneshot::channel();
+ {
+ logic.pending().lock().await.insert(iq_id.clone(), send);
+ }
+ let stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.to_string(),
+ to: None,
+ r#type: IqType::Get,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: Vec::new(),
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ let _ = connection
+ .write_handle()
+ .send(WriteMessage {
+ stanza,
+ respond_to: send,
+ })
+ .await;
+ // TODO: timeout
+ match recv.await {
+ Ok(Ok(())) => info!("roster request sent"),
+ Ok(Err(e)) => {
+ // TODO: log errors if fail to send
+ let _ = result_sender.send(Err(RosterError::Write(e.into())));
+ return;
+ }
+ Err(e) => {
+ let _ =
+ result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ };
+ // TODO: timeout
+ match iq_recv.await {
+ Ok(Ok(stanza)) => match stanza {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })),
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ let contacts: Vec<Contact> =
+ items.into_iter().map(|item| item.into()).collect();
+ if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await {
+ logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Roster(RosterError::Cache(
+ e.into(),
+ ))))
+ .await;
+ };
+ result_sender.send(Ok(contacts));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ result_sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ result_sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Ok(Err(e)) => {
+ result_sender.send(Err(RosterError::Read(e)));
+ return;
+ }
+ Err(e) => {
+ result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::GetChats(sender) => {
+ let chats = logic.db().read_chats().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrdered(sender) => {
+ let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChatsOrderedWithLatestMessages(sender) => {
+ let chats = logic
+ .db()
+ .read_chats_ordered_with_latest_messages()
+ .await
+ .map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetChat(jid, sender) => {
+ let chats = logic.db().read_chat(jid).await.map_err(|e| e.into());
+ sender.send(chats);
+ }
+ Command::GetMessages(jid, sender) => {
+ let messages = logic
+ .db()
+ .read_message_history(jid)
+ .await
+ .map_err(|e| e.into());
+ sender.send(messages);
+ }
+ Command::DeleteChat(jid, sender) => {
+ let result = logic.db().delete_chat(jid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::DeleteMessage(uuid, sender) => {
+ let result = logic.db().delete_message(uuid).await.map_err(|e| e.into());
+ sender.send(result);
+ }
+ Command::GetUser(jid, sender) => {
+ let user = logic.db().read_user(jid).await.map_err(|e| e.into());
+ sender.send(user);
+ }
+ // TODO: offline queue to modify roster
+ Command::AddContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: None,
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ logic.pending().lock().await.insert(iq_id.clone(), send);
+ }
+ // TODO: write_handle send helper function
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::BuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SubscriptionRequest(jid, sender) => {
+ // TODO: i should probably have builders
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::AcceptBuddyRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::AcceptSubscriptionRequest(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Subscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeFromContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnsubscribeContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ Command::UnfriendContact(jid, sender) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid.clone()),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribe),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ match result {
+ Err(_) => {
+ let _ = sender.send(result);
+ }
+ Ok(()) => {
+ let presence = Stanza::Presence(stanza::client::presence::Presence {
+ from: None,
+ id: None,
+ to: Some(jid),
+ r#type: Some(stanza::client::presence::PresenceType::Unsubscribed),
+ lang: None,
+ show: None,
+ status: None,
+ priority: None,
+ errors: Vec::new(),
+ delay: None,
+ });
+ let result = connection.write_handle().write(presence).await;
+ let _ = sender.send(result);
+ }
+ }
+ }
+ Command::DeleteContact(jid, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: None,
+ subscription: Some(stanza::roster::Subscription::Remove),
+ groups: Vec::new(),
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ logic.pending().lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::UpdateContact(jid, contact_update, sender) => {
+ let iq_id = Uuid::new_v4().to_string();
+ let groups = Vec::from_iter(
+ contact_update
+ .groups
+ .into_iter()
+ .map(|group| stanza::roster::Group(Some(group))),
+ );
+ let set_stanza = Stanza::Iq(Iq {
+ from: Some(connection.jid().clone()),
+ id: iq_id.clone(),
+ to: None,
+ r#type: IqType::Set,
+ lang: None,
+ query: Some(iq::Query::Roster(stanza::roster::Query {
+ ver: None,
+ items: vec![stanza::roster::Item {
+ approved: None,
+ ask: false,
+ jid,
+ name: contact_update.name,
+ subscription: None,
+ groups,
+ }],
+ })),
+ errors: Vec::new(),
+ });
+ let (send, recv) = oneshot::channel();
+ {
+ logic.pending().lock().await.insert(iq_id.clone(), send);
+ }
+ let result = connection.write_handle().write(set_stanza).await;
+ if let Err(e) = result {
+ sender.send(Err(RosterError::Write(e)));
+ return;
+ }
+ let iq_result = recv.await;
+ match iq_result {
+ Ok(i) => match i {
+ Ok(iq_result) => match iq_result {
+ Stanza::Iq(Iq {
+ from: _,
+ id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ errors: _,
+ }) if id == iq_id && r#type == IqType::Result => {
+ sender.send(Ok(()));
+ return;
+ }
+ ref s @ Stanza::Iq(Iq {
+ from: _,
+ ref id,
+ to: _,
+ r#type,
+ lang: _,
+ query: _,
+ ref errors,
+ }) if *id == iq_id && r#type == IqType::Error => {
+ if let Some(error) = errors.first() {
+ sender.send(Err(RosterError::StanzaError(error.clone())));
+ } else {
+ sender.send(Err(RosterError::UnexpectedStanza(s.clone())));
+ }
+ return;
+ }
+ s => {
+ sender.send(Err(RosterError::UnexpectedStanza(s)));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(e.into()));
+ return;
+ }
+ },
+ Err(e) => {
+ sender.send(Err(RosterError::Write(WriteError::Actor(e.into()))));
+ return;
+ }
+ }
+ }
+ Command::SetStatus(online, sender) => {
+ let result = logic.db().upsert_cached_status(online.clone()).await;
+ if let Err(e) = result {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache(
+ e.into(),
+ ))))
+ .await;
+ }
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(online.into_stanza(None)))
+ .await
+ .map_err(|e| StatusError::Write(e));
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ // TODO: offline message queue
+ Command::SendMessage(jid, body, sender) => {
+ let id = Uuid::new_v4();
+ let message = Stanza::Message(stanza::client::message::Message {
+ from: Some(connection.jid().clone()),
+ id: Some(id.to_string()),
+ to: Some(jid.clone()),
+ // TODO: specify message type
+ r#type: stanza::client::message::MessageType::Chat,
+ // TODO: lang ?
+ lang: None,
+ subject: None,
+ body: Some(stanza::client::message::Body {
+ lang: None,
+ body: Some(body.body.clone()),
+ }),
+ thread: None,
+ delay: None,
+ });
+ let _ = sender.send(Ok(()));
+ // let _ = sender.send(Ok(message.clone()));
+ let result = connection.write_handle().write(message).await;
+ match result {
+ Ok(_) => {
+ let mut message = Message {
+ id,
+ from: connection.jid().clone(),
+ body,
+ timestamp: Utc::now(),
+ };
+ info!("send message {:?}", message);
+ if let Err(e) = logic
+ .db()
+ .create_message_with_self_resource_and_chat(message.clone(), jid.clone())
+ .await
+ .map_err(|e| e.into())
+ {
+ tracing::error!("{}", e);
+ let _ =
+ logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::MessageSend(
+ MessageSendError::MessageHistory(e),
+ )));
+ }
+ // TODO: don't do this, have separate from from details
+ message.from = message.from.as_bare();
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Message { to: jid, message })
+ .await;
+ }
+ Err(_) => {
+ // let _ = sender.send(result);
+ }
+ }
+ }
+ Command::SendPresence(jid, presence, sender) => {
+ let mut presence: stanza::client::presence::Presence = presence.into();
+ if let Some(jid) = jid {
+ presence.to = Some(jid);
+ };
+ let result = connection
+ .write_handle()
+ .write(Stanza::Presence(presence))
+ .await;
+ // .map_err(|e| StatusError::Write(e));
+ let _ = sender.send(result);
+ }
+ }
+}
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
new file mode 100644
index 0000000..17738df
--- /dev/null
+++ b/filamento/src/logic/process_stanza.rs
@@ -0,0 +1,264 @@
+use std::str::FromStr;
+
+use chrono::Utc;
+use lampada::{Connected, SupervisorSender};
+use stanza::client::Stanza;
+use uuid::Uuid;
+
+use crate::{
+ UpdateMessage,
+ chat::{Body, Message},
+ error::{Error, IqError, MessageRecvError, PresenceError, RosterError},
+ presence::{Offline, Online, Presence, PresenceType, Show},
+ roster::Contact,
+};
+
+use super::ClientLogic;
+
+pub async fn handle_stanza(
+ logic: ClientLogic,
+ stanza: Stanza,
+ connection: Connected,
+ supervisor: SupervisorSender,
+) {
+ match stanza {
+ Stanza::Message(stanza_message) => {
+ if let Some(mut from) = stanza_message.from {
+ // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
+ let timestamp = stanza_message
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ // TODO: group chat messages
+ let mut message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ timestamp,
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ let result = logic
+ .db()
+ .create_message_with_user_resource_and_chat(message.clone(), from.clone())
+ .await;
+ if let Err(e) = result {
+ tracing::error!("messagecreate");
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MessageHistory(e.into()),
+ )))
+ .await;
+ }
+ message.from = message.from.as_bare();
+ from = from.as_bare();
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Message { to: from, message })
+ .await;
+ } else {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::MessageRecv(
+ MessageRecvError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Presence(presence) => {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Presence(
+ // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
+ PresenceError::StanzaError(
+ presence
+ .errors
+ .first()
+ .cloned()
+ .expect("error MUST have error"),
+ ),
+ )))
+ .await;
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::Unsupported,
+ )))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::SubscriptionRequest(from))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Offline(offline),
+ },
+ })
+ .await;
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => {}
+ stanza::client::presence::PresenceType::Unsubscribe => {}
+ stanza::client::presence::PresenceType::Unsubscribed => {}
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Online(online),
+ },
+ })
+ .await;
+ }
+ }
+ } else {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Iq(iq) => match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = logic.pending().lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ } else {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
+ iq.id,
+ ))))
+ .await;
+ }
+ }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => {}
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ logic.db().delete_contact(item.jid.clone()).await;
+ logic
+ .update_sender()
+ .send(UpdateMessage::RosterDelete(item.jid))
+ .await;
+ // TODO: send result
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) =
+ logic.db().upsert_contact(contact.clone()).await
+ {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Roster(
+ RosterError::Cache(e.into()),
+ )))
+ .await;
+ }
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::RosterUpdate(contact))
+ .await;
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
+ }
+ }
+ }
+ }
+ // TODO: send unsupported to server
+ _ => {}
+ }
+ } else {
+ // TODO: send error (unsupported) to server
+ }
+ }
+ },
+ Stanza::Error(error) => {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::Stream(error)))
+ .await;
+ // TODO: reconnect
+ }
+ Stanza::OtherContent(content) => {
+ let _ = logic
+ .update_sender()
+ .send(UpdateMessage::Error(Error::UnrecognizedContent));
+ // TODO: send error to write_thread
+ }
+ }
+}