diff options
author | 2025-03-26 15:29:11 +0000 | |
---|---|---|
committer | 2025-03-26 15:29:11 +0000 | |
commit | bf677e1f9ce07e2fa8971c15b9a082cddbb40dec (patch) | |
tree | d8a6c8bec63c774322b207af43ad573d730ee1a2 /filamento | |
parent | 2211f324782cdc617b4b5ecd071178e372539fe4 (diff) | |
download | luz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.tar.gz luz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.tar.bz2 luz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.zip |
refactor(filament): split logic into different files
Diffstat (limited to 'filamento')
-rw-r--r-- | filamento/.gitignore | 1 | ||||
-rw-r--r-- | filamento/src/lib.rs | 1124 | ||||
-rw-r--r-- | filamento/src/logic/abort.rs | 10 | ||||
-rw-r--r-- | filamento/src/logic/connect.rs | 91 | ||||
-rw-r--r-- | filamento/src/logic/connection_error.rs | 12 | ||||
-rw-r--r-- | filamento/src/logic/disconnect.rs | 18 | ||||
-rw-r--r-- | filamento/src/logic/mod.rs | 90 | ||||
-rw-r--r-- | filamento/src/logic/offline.rs | 110 | ||||
-rw-r--r-- | filamento/src/logic/online.rs | 661 | ||||
-rw-r--r-- | filamento/src/logic/process_stanza.rs | 264 |
10 files changed, 1260 insertions, 1121 deletions
diff --git a/filamento/.gitignore b/filamento/.gitignore new file mode 100644 index 0000000..8bb4037 --- /dev/null +++ b/filamento/.gitignore @@ -0,0 +1 @@ +filamento.db diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index db59a67..030dc43 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -19,6 +19,7 @@ use lampada::{ Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage, error::{ActorError, CommandError, ConnectionError, ReadError, WriteError}, }; +use logic::ClientLogic; use presence::{Offline, Online, Presence, PresenceType, Show}; use roster::{Contact, ContactUpdate}; use stanza::client::{ @@ -36,6 +37,7 @@ use uuid::Uuid; pub mod chat; pub mod db; pub mod error; +mod logic; pub mod presence; pub mod roster; pub mod user; @@ -145,11 +147,7 @@ impl Client { let (_sup_send, sup_recv) = oneshot::channel(); let sup_recv = sup_recv.fuse(); - let logic = ClientLogic { - db, - pending: Arc::new(Mutex::new(HashMap::new())), - update_sender: update_send, - }; + let logic = ClientLogic::new(db, Arc::new(Mutex::new(HashMap::new())), update_send); let actor: CoreClient<ClientLogic> = CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); @@ -450,1122 +448,6 @@ impl Client { } } -#[derive(Clone)] -pub struct ClientLogic { - db: Db, - pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - update_sender: mpsc::Sender<UpdateMessage>, -} - -impl Logic for ClientLogic { - type Cmd = Command; - - async fn handle_connect(self, connection: Connected) { - let (send, recv) = oneshot::channel(); - debug!("getting roster"); - self.clone() - .handle_online(Command::GetRoster(send), connection.clone()) - .await; - debug!("sent roster req"); - let roster = recv.await; - debug!("got roster"); - match roster { - Ok(r) => match r { - Ok(roster) => { - let online = self.db.read_cached_status().await; - let online = match online { - Ok(online) => online, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::StatusCacheError(e.into()), - ))) - .await; - Online::default() - } - }; - let (send, recv) = oneshot::channel(); - self.clone() - .handle_online( - Command::SendPresence(None, PresenceType::Online(online.clone()), send), - connection, - ) - .await; - let set_status = recv.await; - match set_status { - Ok(s) => match s { - Ok(()) => { - let _ = self - .update_sender - .send(UpdateMessage::Online(online, roster)) - .await; - } - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting(e.into()))) - .await; - } - }, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::SendPresence(WriteError::Actor(e.into())), - ))) - .await; - } - } - } - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting(e.into()))) - .await; - } - }, - Err(e) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Connecting( - ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( - e.into(), - ))), - ))) - .await; - } - } - } - - async fn handle_disconnect(self, connection: Connected) { - // TODO: be able to set offline status message - let offline_presence: stanza::client::presence::Presence = - Offline::default().into_stanza(None); - let stanza = Stanza::Presence(offline_presence); - // TODO: timeout and error check - connection.write_handle().write(stanza).await; - let _ = self - .update_sender - .send(UpdateMessage::Offline(Offline::default())) - .await; - } - - async fn handle_stanza( - self, - stanza: Stanza, - connection: Connected, - supervisor: SupervisorSender, - ) { - match stanza { - Stanza::Message(stanza_message) => { - if let Some(mut from) = stanza_message.from { - // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. - let timestamp = stanza_message - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - // TODO: group chat messages - let mut message = Message { - id: stanza_message - .id - // TODO: proper id storage - .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) - .unwrap_or_else(|| Uuid::new_v4()), - from: from.clone(), - timestamp, - body: Body { - // TODO: should this be an option? - body: stanza_message - .body - .map(|body| body.body) - .unwrap_or_default() - .unwrap_or_default(), - }, - }; - // TODO: can this be more efficient? - let result = self - .db - .create_message_with_user_resource_and_chat(message.clone(), from.clone()) - .await; - if let Err(e) = result { - tracing::error!("messagecreate"); - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MessageHistory(e.into()), - ))) - .await; - } - message.from = message.from.as_bare(); - from = from.as_bare(); - let _ = self - .update_sender - .send(UpdateMessage::Message { to: from, message }) - .await; - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MissingFrom, - ))) - .await; - } - } - Stanza::Presence(presence) => { - if let Some(from) = presence.from { - match presence.r#type { - Some(r#type) => match r#type { - // error processing a presence from somebody - stanza::client::presence::PresenceType::Error => { - // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display - PresenceError::StanzaError( - presence - .errors - .first() - .cloned() - .expect("error MUST have error"), - ), - ))) - .await; - } - // should not happen (error to server) - stanza::client::presence::PresenceType::Probe => { - // TODO: should probably write an error and restart stream - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::Unsupported, - ))) - .await; - } - stanza::client::presence::PresenceType::Subscribe => { - // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event - let _ = self - .update_sender - .send(UpdateMessage::SubscriptionRequest(from)) - .await; - } - stanza::client::presence::PresenceType::Unavailable => { - let offline = Offline { - status: presence.status.map(|status| status.status.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = self - .update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Offline(offline), - }, - }) - .await; - } - // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. - stanza::client::presence::PresenceType::Subscribed => {} - stanza::client::presence::PresenceType::Unsubscribe => {} - stanza::client::presence::PresenceType::Unsubscribed => {} - }, - None => { - let online = Online { - show: presence.show.map(|show| match show { - stanza::client::presence::Show::Away => Show::Away, - stanza::client::presence::Show::Chat => Show::Chat, - stanza::client::presence::Show::Dnd => Show::DoNotDisturb, - stanza::client::presence::Show::Xa => Show::ExtendedAway, - }), - status: presence.status.map(|status| status.status.0), - priority: presence.priority.map(|priority| priority.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = self - .update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Online(online), - }, - }) - .await; - } - } - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::MissingFrom, - ))) - .await; - } - } - Stanza::Iq(iq) => match iq.r#type { - stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { - let send; - { - send = self.pending.lock().await.remove(&iq.id); - } - if let Some(send) = send { - send.send(Ok(Stanza::Iq(iq))); - } else { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( - iq.id, - )))) - .await; - } - } - // TODO: send unsupported to server - // TODO: proper errors i am so tired please - stanza::client::iq::IqType::Get => {} - stanza::client::iq::IqType::Set => { - if let Some(query) = iq.query { - match query { - stanza::client::iq::Query::Roster(mut query) => { - // TODO: there should only be one - if let Some(item) = query.items.pop() { - match item.subscription { - Some(stanza::roster::Subscription::Remove) => { - self.db.delete_contact(item.jid.clone()).await; - self.update_sender - .send(UpdateMessage::RosterDelete(item.jid)) - .await; - // TODO: send result - } - _ => { - let contact: Contact = item.into(); - if let Err(e) = - self.db.upsert_contact(contact.clone()).await - { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Roster( - RosterError::Cache(e.into()), - ))) - .await; - } - let _ = self - .update_sender - .send(UpdateMessage::RosterUpdate(contact)) - .await; - // TODO: send result - // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { - // from: , - // id: todo!(), - // to: todo!(), - // r#type: todo!(), - // lang: todo!(), - // query: todo!(), - // errors: todo!(), - // })); - } - } - } - } - // TODO: send unsupported to server - _ => {} - } - } else { - // TODO: send error (unsupported) to server - } - } - }, - Stanza::Error(error) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::Stream(error))) - .await; - // TODO: reconnect - } - Stanza::OtherContent(content) => { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::UnrecognizedContent)); - // TODO: send error to write_thread - } - } - } - - async fn handle_online(self, command: Command, connection: Connected) { - match command { - Command::GetRoster(result_sender) => { - // TODO: jid resource should probably be stored within the connection - debug!("before client_jid lock"); - debug!("after client_jid lock"); - let iq_id = Uuid::new_v4().to_string(); - let (send, iq_recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.to_string(), - to: None, - r#type: IqType::Get, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: Vec::new(), - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - let _ = connection - .write_handle() - .send(WriteMessage { - stanza, - respond_to: send, - }) - .await; - // TODO: timeout - match recv.await { - Ok(Ok(())) => info!("roster request sent"), - Ok(Err(e)) => { - // TODO: log errors if fail to send - let _ = result_sender.send(Err(RosterError::Write(e.into()))); - return; - } - Err(e) => { - let _ = result_sender - .send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - }; - // TODO: timeout - match iq_recv.await { - Ok(Ok(stanza)) => match stanza { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - let contacts: Vec<Contact> = - items.into_iter().map(|item| item.into()).collect(); - if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { - self.update_sender - .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( - e.into(), - )))) - .await; - }; - result_sender.send(Ok(contacts)); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - result_sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - result_sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Ok(Err(e)) => { - result_sender.send(Err(RosterError::Read(e))); - return; - } - Err(e) => { - result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::GetChats(sender) => { - let chats = self.db.read_chats().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrdered(sender) => { - let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrderedWithLatestMessages(sender) => { - let chats = self - .db - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChat(jid, sender) => { - let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetMessages(jid, sender) => { - let messages = self - .db - .read_message_history(jid) - .await - .map_err(|e| e.into()); - sender.send(messages); - } - Command::DeleteChat(jid, sender) => { - let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::DeleteMessage(uuid, sender) => { - let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::GetUser(jid, sender) => { - let user = self.db.read_user(jid).await.map_err(|e| e.into()); - sender.send(user); - } - // TODO: offline queue to modify roster - Command::AddContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: None, - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - // TODO: write_handle send helper function - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::BuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } - } - Command::SubscriptionRequest(jid, sender) => { - // TODO: i should probably have builders - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - Command::AcceptBuddyRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Subscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } - } - Command::AcceptSubscriptionRequest(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Subscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - Command::UnsubscribeFromContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - Command::UnsubscribeContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - Command::UnfriendContact(jid, sender) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid.clone()), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - match result { - Err(_) => { - let _ = sender.send(result); - } - Ok(()) => { - let presence = Stanza::Presence(stanza::client::presence::Presence { - from: None, - id: None, - to: Some(jid), - r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), - lang: None, - show: None, - status: None, - priority: None, - errors: Vec::new(), - delay: None, - }); - let result = connection.write_handle().write(presence).await; - let _ = sender.send(result); - } - } - } - Command::DeleteContact(jid, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: None, - subscription: Some(stanza::roster::Subscription::Remove), - groups: Vec::new(), - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::UpdateContact(jid, contact_update, sender) => { - let iq_id = Uuid::new_v4().to_string(); - let groups = Vec::from_iter( - contact_update - .groups - .into_iter() - .map(|group| stanza::roster::Group(Some(group))), - ); - let set_stanza = Stanza::Iq(Iq { - from: Some(connection.jid().clone()), - id: iq_id.clone(), - to: None, - r#type: IqType::Set, - lang: None, - query: Some(iq::Query::Roster(stanza::roster::Query { - ver: None, - items: vec![stanza::roster::Item { - approved: None, - ask: false, - jid, - name: contact_update.name, - subscription: None, - groups, - }], - })), - errors: Vec::new(), - }); - let (send, recv) = oneshot::channel(); - { - self.pending.lock().await.insert(iq_id.clone(), send); - } - let result = connection.write_handle().write(set_stanza).await; - if let Err(e) = result { - sender.send(Err(RosterError::Write(e))); - return; - } - let iq_result = recv.await; - match iq_result { - Ok(i) => match i { - Ok(iq_result) => match iq_result { - Stanza::Iq(Iq { - from: _, - id, - to: _, - r#type, - lang: _, - query: _, - errors: _, - }) if id == iq_id && r#type == IqType::Result => { - sender.send(Ok(())); - return; - } - ref s @ Stanza::Iq(Iq { - from: _, - ref id, - to: _, - r#type, - lang: _, - query: _, - ref errors, - }) if *id == iq_id && r#type == IqType::Error => { - if let Some(error) = errors.first() { - sender.send(Err(RosterError::StanzaError(error.clone()))); - } else { - sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); - } - return; - } - s => { - sender.send(Err(RosterError::UnexpectedStanza(s))); - return; - } - }, - Err(e) => { - sender.send(Err(e.into())); - return; - } - }, - Err(e) => { - sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); - return; - } - } - } - Command::SetStatus(online, sender) => { - let result = self.db.upsert_cached_status(online.clone()).await; - if let Err(e) = result { - let _ = self - .update_sender - .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( - e.into(), - )))) - .await; - } - let result = connection - .write_handle() - .write(Stanza::Presence(online.into_stanza(None))) - .await - .map_err(|e| StatusError::Write(e)); - // .map_err(|e| StatusError::Write(e)); - let _ = sender.send(result); - } - // TODO: offline message queue - Command::SendMessage(jid, body, sender) => { - let id = Uuid::new_v4(); - let message = Stanza::Message(stanza::client::message::Message { - from: Some(connection.jid().clone()), - id: Some(id.to_string()), - to: Some(jid.clone()), - // TODO: specify message type - r#type: stanza::client::message::MessageType::Chat, - // TODO: lang ? - lang: None, - subject: None, - body: Some(stanza::client::message::Body { - lang: None, - body: Some(body.body.clone()), - }), - thread: None, - delay: None, - }); - let _ = sender.send(Ok(())); - // let _ = sender.send(Ok(message.clone())); - let result = connection.write_handle().write(message).await; - match result { - Ok(_) => { - let mut message = Message { - id, - from: connection.jid().clone(), - body, - timestamp: Utc::now(), - }; - info!("send message {:?}", message); - if let Err(e) = self - .db - .create_message_with_self_resource_and_chat( - message.clone(), - jid.clone(), - ) - .await - .map_err(|e| e.into()) - { - tracing::error!("{}", e); - let _ = - self.update_sender - .send(UpdateMessage::Error(Error::MessageSend( - error::MessageSendError::MessageHistory(e), - ))); - } - // TODO: don't do this, have separate from from details - message.from = message.from.as_bare(); - let _ = self - .update_sender - .send(UpdateMessage::Message { to: jid, message }) - .await; - } - Err(_) => { - // let _ = sender.send(result); - } - } - } - Command::SendPresence(jid, presence, sender) => { - let mut presence: stanza::client::presence::Presence = presence.into(); - if let Some(jid) = jid { - presence.to = Some(jid); - }; - let result = connection - .write_handle() - .write(Stanza::Presence(presence)) - .await; - // .map_err(|e| StatusError::Write(e)); - let _ = sender.send(result); - } - } - } - - async fn handle_offline(self, command: Command) { - match command { - Command::GetRoster(sender) => { - let roster = self.db.read_cached_roster().await; - match roster { - Ok(roster) => { - let _ = sender.send(Ok(roster)); - } - Err(e) => { - let _ = sender.send(Err(RosterError::Cache(e.into()))); - } - } - } - Command::GetChats(sender) => { - let chats = self.db.read_chats().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrdered(sender) => { - let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChatsOrderedWithLatestMessages(sender) => { - let chats = self - .db - .read_chats_ordered_with_latest_messages() - .await - .map_err(|e| e.into()); - sender.send(chats); - } - Command::GetChat(jid, sender) => { - let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); - sender.send(chats); - } - Command::GetMessages(jid, sender) => { - let messages = self - .db - .read_message_history(jid) - .await - .map_err(|e| e.into()); - sender.send(messages); - } - Command::DeleteChat(jid, sender) => { - let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::DeleteMessage(uuid, sender) => { - let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); - sender.send(result); - } - Command::GetUser(jid, sender) => { - let user = self.db.read_user(jid).await.map_err(|e| e.into()); - sender.send(user); - } - // TODO: offline queue to modify roster - Command::AddContact(_jid, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::BuddyRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::SubscriptionRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::AcceptBuddyRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::AcceptSubscriptionRequest(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnsubscribeFromContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnsubscribeContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::UnfriendContact(_jid, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::DeleteContact(_jid, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::UpdateContact(_jid, _contact_update, sender) => { - sender.send(Err(RosterError::Write(WriteError::Disconnected))); - } - Command::SetStatus(online, sender) => { - let result = self - .db - .upsert_cached_status(online) - .await - .map_err(|e| StatusError::Cache(e.into())); - sender.send(result); - } - // TODO: offline message queue - Command::SendMessage(_jid, _body, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - Command::SendPresence(_jid, _presence, sender) => { - sender.send(Err(WriteError::Disconnected)); - } - } - } - // pub async fn handle_stream_error(self, error) {} - // stanza errors (recoverable) - // pub async fn handle_error(self, error: Error) {} - // when it aborts, must clear iq map no matter what - async fn on_abort(self) { - let mut iqs = self.pending.lock().await; - for (_id, sender) in iqs.drain() { - let _ = sender.send(Err(ReadError::LostConnection)); - } - } - - async fn handle_connection_error(self, error: ConnectionError) { - self.update_sender - .send(UpdateMessage::Error( - ConnectionError::AlreadyConnected.into(), - )) - .await; - } -} - impl From<Command> for CoreClientCommand<Command> { fn from(value: Command) -> Self { CoreClientCommand::Command(value) diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs new file mode 100644 index 0000000..32c4823 --- /dev/null +++ b/filamento/src/logic/abort.rs @@ -0,0 +1,10 @@ +use lampada::error::ReadError; + +use super::ClientLogic; + +pub async fn on_abort(logic: ClientLogic) { + let mut iqs = logic.pending().lock().await; + for (_id, sender) in iqs.drain() { + let _ = sender.send(Err(ReadError::LostConnection)); + } +} diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs new file mode 100644 index 0000000..4dc789e --- /dev/null +++ b/filamento/src/logic/connect.rs @@ -0,0 +1,91 @@ +use lampada::{Connected, Logic, error::WriteError}; +use tokio::sync::oneshot; +use tracing::debug; + +use crate::{ + Command, UpdateMessage, + error::{ConnectionJobError, Error, RosterError}, + presence::{Online, PresenceType}, +}; + +use super::ClientLogic; + +pub async fn handle_connect(logic: ClientLogic, connection: Connected) { + let (send, recv) = oneshot::channel(); + debug!("getting roster"); + logic + .clone() + .handle_online(Command::GetRoster(send), connection.clone()) + .await; + debug!("sent roster req"); + let roster = recv.await; + debug!("got roster"); + match roster { + Ok(r) => match r { + Ok(roster) => { + let online = logic.db().read_cached_status().await; + let online = match online { + Ok(online) => online, + Err(e) => { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::StatusCacheError(e.into()), + ))) + .await; + Online::default() + } + }; + let (send, recv) = oneshot::channel(); + logic + .clone() + .handle_online( + Command::SendPresence(None, PresenceType::Online(online.clone()), send), + connection, + ) + .await; + let set_status = recv.await; + match set_status { + Ok(s) => match s { + Ok(()) => { + let _ = logic + .update_sender() + .send(UpdateMessage::Online(online, roster)) + .await; + } + Err(e) => { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Connecting(e.into()))) + .await; + } + }, + Err(e) => { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::SendPresence(WriteError::Actor(e.into())), + ))) + .await; + } + } + } + Err(e) => { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Connecting(e.into()))) + .await; + } + }, + Err(e) => { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Connecting( + ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( + e.into(), + ))), + ))) + .await; + } + } +} diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs new file mode 100644 index 0000000..ac9e931 --- /dev/null +++ b/filamento/src/logic/connection_error.rs @@ -0,0 +1,12 @@ +use lampada::error::ConnectionError; + +use crate::UpdateMessage; + +use super::ClientLogic; + +pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { + logic + .update_sender() + .send(UpdateMessage::Error(error.into())) + .await; +} diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs new file mode 100644 index 0000000..241c3e6 --- /dev/null +++ b/filamento/src/logic/disconnect.rs @@ -0,0 +1,18 @@ +use lampada::Connected; +use stanza::client::Stanza; + +use crate::{UpdateMessage, presence::Offline}; + +use super::ClientLogic; + +pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) { + // TODO: be able to set offline status message + let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None); + let stanza = Stanza::Presence(offline_presence); + // TODO: timeout and error check + connection.write_handle().write(stanza).await; + let _ = logic + .update_sender() + .send(UpdateMessage::Offline(Offline::default())) + .await; +} diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs new file mode 100644 index 0000000..638f682 --- /dev/null +++ b/filamento/src/logic/mod.rs @@ -0,0 +1,90 @@ +use std::{collections::HashMap, sync::Arc}; + +use lampada::{Logic, error::ReadError}; +use stanza::client::Stanza; +use tokio::sync::{Mutex, mpsc, oneshot}; + +use crate::{Command, UpdateMessage, db::Db}; + +mod abort; +mod connect; +mod connection_error; +mod disconnect; +mod offline; +mod online; +mod process_stanza; + +#[derive(Clone)] +pub struct ClientLogic { + db: Db, + pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + update_sender: mpsc::Sender<UpdateMessage>, +} + +impl ClientLogic { + pub fn new( + db: Db, + pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + update_sender: mpsc::Sender<UpdateMessage>, + ) -> Self { + Self { + db, + pending, + update_sender, + } + } + + pub fn db(&self) -> &Db { + &self.db + } + + pub fn pending(&self) -> &Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>> { + &self.pending.as_ref() + } + + pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { + &self.update_sender + } +} + +impl Logic for ClientLogic { + type Cmd = Command; + + // pub async fn handle_stream_error(self, error) {} + // stanza errors (recoverable) + // pub async fn handle_error(self, error: Error) {} + // when it aborts, must clear iq map no matter what + + async fn handle_connect(self, connection: lampada::Connected) { + connect::handle_connect(self, connection).await; + } + + async fn handle_disconnect(self, connection: lampada::Connected) { + disconnect::handle_disconnect(self, connection).await; + } + + async fn handle_stanza( + self, + stanza: ::stanza::client::Stanza, + connection: lampada::Connected, + supervisor: lampada::SupervisorSender, + ) { + process_stanza::handle_stanza(self, stanza, connection, supervisor).await; + } + + async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) { + online::handle_online(self, command, connection).await; + } + + async fn handle_offline(self, command: Self::Cmd) { + offline::handle_offline(self, command).await; + } + + async fn on_abort(self) { + abort::on_abort(self).await; + } + + async fn handle_connection_error(self, error: lampada::error::ConnectionError) { + connection_error::handle_connection_error(self, error).await; + } +} diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs new file mode 100644 index 0000000..17a60f3 --- /dev/null +++ b/filamento/src/logic/offline.rs @@ -0,0 +1,110 @@ +use lampada::error::WriteError; + +use crate::{ + Command, + error::{RosterError, StatusError}, +}; + +use super::ClientLogic; + +pub async fn handle_offline(logic: ClientLogic, command: Command) { + match command { + Command::GetRoster(sender) => { + let roster = logic.db().read_cached_roster().await; + match roster { + Ok(roster) => { + let _ = sender.send(Ok(roster)); + } + Err(e) => { + let _ = sender.send(Err(RosterError::Cache(e.into()))); + } + } + } + Command::GetChats(sender) => { + let chats = logic.db().read_chats().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrdered(sender) => { + let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = logic + .db() + .read_chats_ordered_with_latest_messages() + .await + .map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChat(jid, sender) => { + let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetMessages(jid, sender) => { + let messages = logic + .db() + .read_message_history(jid) + .await + .map_err(|e| e.into()); + sender.send(messages); + } + Command::DeleteChat(jid, sender) => { + let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::DeleteMessage(uuid, sender) => { + let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::GetUser(jid, sender) => { + let user = logic.db().read_user(jid).await.map_err(|e| e.into()); + sender.send(user); + } + // TODO: offline queue to modify roster + Command::AddContact(_jid, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::BuddyRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::SubscriptionRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::AcceptBuddyRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::AcceptSubscriptionRequest(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnsubscribeFromContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnsubscribeContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::UnfriendContact(_jid, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::DeleteContact(_jid, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::UpdateContact(_jid, _contact_update, sender) => { + sender.send(Err(RosterError::Write(WriteError::Disconnected))); + } + Command::SetStatus(online, sender) => { + let result = logic + .db() + .upsert_cached_status(online) + .await + .map_err(|e| StatusError::Cache(e.into())); + sender.send(result); + } + // TODO: offline message queue + Command::SendMessage(_jid, _body, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + Command::SendPresence(_jid, _presence, sender) => { + sender.send(Err(WriteError::Disconnected)); + } + } +} diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs new file mode 100644 index 0000000..e8cbb33 --- /dev/null +++ b/filamento/src/logic/online.rs @@ -0,0 +1,661 @@ +use chrono::Utc; +use lampada::{Connected, WriteMessage, error::WriteError}; +use stanza::client::{ + Stanza, + iq::{self, Iq, IqType}, +}; +use tokio::sync::oneshot; +use tracing::{debug, info}; +use uuid::Uuid; + +use crate::{ + Command, UpdateMessage, + chat::Message, + error::{Error, MessageSendError, RosterError, StatusError}, + roster::Contact, +}; + +use super::ClientLogic; + +pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { + match command { + Command::GetRoster(result_sender) => { + let iq_id = Uuid::new_v4().to_string(); + let (send, iq_recv) = oneshot::channel(); + { + logic.pending().lock().await.insert(iq_id.clone(), send); + } + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + let _ = connection + .write_handle() + .send(WriteMessage { + stanza, + respond_to: send, + }) + .await; + // TODO: timeout + match recv.await { + Ok(Ok(())) => info!("roster request sent"), + Ok(Err(e)) => { + // TODO: log errors if fail to send + let _ = result_sender.send(Err(RosterError::Write(e.into()))); + return; + } + Err(e) => { + let _ = + result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + }; + // TODO: timeout + match iq_recv.await { + Ok(Ok(stanza)) => match stanza { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = + items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { + logic + .update_sender() + .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( + e.into(), + )))) + .await; + }; + result_sender.send(Ok(contacts)); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + result_sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + result_sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Ok(Err(e)) => { + result_sender.send(Err(RosterError::Read(e))); + return; + } + Err(e) => { + result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::GetChats(sender) => { + let chats = logic.db().read_chats().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrdered(sender) => { + let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = logic + .db() + .read_chats_ordered_with_latest_messages() + .await + .map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChat(jid, sender) => { + let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetMessages(jid, sender) => { + let messages = logic + .db() + .read_message_history(jid) + .await + .map_err(|e| e.into()); + sender.send(messages); + } + Command::DeleteChat(jid, sender) => { + let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::DeleteMessage(uuid, sender) => { + let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::GetUser(jid, sender) => { + let user = logic.db().read_user(jid).await.map_err(|e| e.into()); + sender.send(user); + } + // TODO: offline queue to modify roster + Command::AddContact(jid, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: None, + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + logic.pending().lock().await.insert(iq_id.clone(), send); + } + // TODO: write_handle send helper function + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::BuddyRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::SubscriptionRequest(jid, sender) => { + // TODO: i should probably have builders + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::AcceptBuddyRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::AcceptSubscriptionRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnsubscribeFromContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnsubscribeContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnfriendContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::DeleteContact(jid, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: Some(stanza::roster::Subscription::Remove), + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + logic.pending().lock().await.insert(iq_id.clone(), send); + } + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::UpdateContact(jid, contact_update, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let groups = Vec::from_iter( + contact_update + .groups + .into_iter() + .map(|group| stanza::roster::Group(Some(group))), + ); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: contact_update.name, + subscription: None, + groups, + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + logic.pending().lock().await.insert(iq_id.clone(), send); + } + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::SetStatus(online, sender) => { + let result = logic.db().upsert_cached_status(online.clone()).await; + if let Err(e) = result { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( + e.into(), + )))) + .await; + } + let result = connection + .write_handle() + .write(Stanza::Presence(online.into_stanza(None))) + .await + .map_err(|e| StatusError::Write(e)); + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } + // TODO: offline message queue + Command::SendMessage(jid, body, sender) => { + let id = Uuid::new_v4(); + let message = Stanza::Message(stanza::client::message::Message { + from: Some(connection.jid().clone()), + id: Some(id.to_string()), + to: Some(jid.clone()), + // TODO: specify message type + r#type: stanza::client::message::MessageType::Chat, + // TODO: lang ? + lang: None, + subject: None, + body: Some(stanza::client::message::Body { + lang: None, + body: Some(body.body.clone()), + }), + thread: None, + delay: None, + }); + let _ = sender.send(Ok(())); + // let _ = sender.send(Ok(message.clone())); + let result = connection.write_handle().write(message).await; + match result { + Ok(_) => { + let mut message = Message { + id, + from: connection.jid().clone(), + body, + timestamp: Utc::now(), + }; + info!("send message {:?}", message); + if let Err(e) = logic + .db() + .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) + .await + .map_err(|e| e.into()) + { + tracing::error!("{}", e); + let _ = + logic + .update_sender() + .send(UpdateMessage::Error(Error::MessageSend( + MessageSendError::MessageHistory(e), + ))); + } + // TODO: don't do this, have separate from from details + message.from = message.from.as_bare(); + let _ = logic + .update_sender() + .send(UpdateMessage::Message { to: jid, message }) + .await; + } + Err(_) => { + // let _ = sender.send(result); + } + } + } + Command::SendPresence(jid, presence, sender) => { + let mut presence: stanza::client::presence::Presence = presence.into(); + if let Some(jid) = jid { + presence.to = Some(jid); + }; + let result = connection + .write_handle() + .write(Stanza::Presence(presence)) + .await; + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } + } +} diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs new file mode 100644 index 0000000..17738df --- /dev/null +++ b/filamento/src/logic/process_stanza.rs @@ -0,0 +1,264 @@ +use std::str::FromStr; + +use chrono::Utc; +use lampada::{Connected, SupervisorSender}; +use stanza::client::Stanza; +use uuid::Uuid; + +use crate::{ + UpdateMessage, + chat::{Body, Message}, + error::{Error, IqError, MessageRecvError, PresenceError, RosterError}, + presence::{Offline, Online, Presence, PresenceType, Show}, + roster::Contact, +}; + +use super::ClientLogic; + +pub async fn handle_stanza( + logic: ClientLogic, + stanza: Stanza, + connection: Connected, + supervisor: SupervisorSender, +) { + match stanza { + Stanza::Message(stanza_message) => { + if let Some(mut from) = stanza_message.from { + // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. + let timestamp = stanza_message + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + // TODO: group chat messages + let mut message = Message { + id: stanza_message + .id + // TODO: proper id storage + .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) + .unwrap_or_else(|| Uuid::new_v4()), + from: from.clone(), + timestamp, + body: Body { + // TODO: should this be an option? + body: stanza_message + .body + .map(|body| body.body) + .unwrap_or_default() + .unwrap_or_default(), + }, + }; + // TODO: can this be more efficient? + let result = logic + .db() + .create_message_with_user_resource_and_chat(message.clone(), from.clone()) + .await; + if let Err(e) = result { + tracing::error!("messagecreate"); + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MessageHistory(e.into()), + ))) + .await; + } + message.from = message.from.as_bare(); + from = from.as_bare(); + let _ = logic + .update_sender() + .send(UpdateMessage::Message { to: from, message }) + .await; + } else { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::MessageRecv( + MessageRecvError::MissingFrom, + ))) + .await; + } + } + Stanza::Presence(presence) => { + if let Some(from) = presence.from { + match presence.r#type { + Some(r#type) => match r#type { + // error processing a presence from somebody + stanza::client::presence::PresenceType::Error => { + // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Presence( + // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display + PresenceError::StanzaError( + presence + .errors + .first() + .cloned() + .expect("error MUST have error"), + ), + ))) + .await; + } + // should not happen (error to server) + stanza::client::presence::PresenceType::Probe => { + // TODO: should probably write an error and restart stream + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Presence( + PresenceError::Unsupported, + ))) + .await; + } + stanza::client::presence::PresenceType::Subscribe => { + // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event + let _ = logic + .update_sender() + .send(UpdateMessage::SubscriptionRequest(from)) + .await; + } + stanza::client::presence::PresenceType::Unavailable => { + let offline = Offline { + status: presence.status.map(|status| status.status.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + let _ = logic + .update_sender() + .send(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Offline(offline), + }, + }) + .await; + } + // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. + stanza::client::presence::PresenceType::Subscribed => {} + stanza::client::presence::PresenceType::Unsubscribe => {} + stanza::client::presence::PresenceType::Unsubscribed => {} + }, + None => { + let online = Online { + show: presence.show.map(|show| match show { + stanza::client::presence::Show::Away => Show::Away, + stanza::client::presence::Show::Chat => Show::Chat, + stanza::client::presence::Show::Dnd => Show::DoNotDisturb, + stanza::client::presence::Show::Xa => Show::ExtendedAway, + }), + status: presence.status.map(|status| status.status.0), + priority: presence.priority.map(|priority| priority.0), + }; + let timestamp = presence + .delay + .map(|delay| delay.stamp) + .unwrap_or_else(|| Utc::now()); + let _ = logic + .update_sender() + .send(UpdateMessage::Presence { + from, + presence: Presence { + timestamp, + presence: PresenceType::Online(online), + }, + }) + .await; + } + } + } else { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Presence( + PresenceError::MissingFrom, + ))) + .await; + } + } + Stanza::Iq(iq) => match iq.r#type { + stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { + let send; + { + send = logic.pending().lock().await.remove(&iq.id); + } + if let Some(send) = send { + send.send(Ok(Stanza::Iq(iq))); + } else { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( + iq.id, + )))) + .await; + } + } + // TODO: send unsupported to server + // TODO: proper errors i am so tired please + stanza::client::iq::IqType::Get => {} + stanza::client::iq::IqType::Set => { + if let Some(query) = iq.query { + match query { + stanza::client::iq::Query::Roster(mut query) => { + // TODO: there should only be one + if let Some(item) = query.items.pop() { + match item.subscription { + Some(stanza::roster::Subscription::Remove) => { + logic.db().delete_contact(item.jid.clone()).await; + logic + .update_sender() + .send(UpdateMessage::RosterDelete(item.jid)) + .await; + // TODO: send result + } + _ => { + let contact: Contact = item.into(); + if let Err(e) = + logic.db().upsert_contact(contact.clone()).await + { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Roster( + RosterError::Cache(e.into()), + ))) + .await; + } + let _ = logic + .update_sender() + .send(UpdateMessage::RosterUpdate(contact)) + .await; + // TODO: send result + // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { + // from: , + // id: todo!(), + // to: todo!(), + // r#type: todo!(), + // lang: todo!(), + // query: todo!(), + // errors: todo!(), + // })); + } + } + } + } + // TODO: send unsupported to server + _ => {} + } + } else { + // TODO: send error (unsupported) to server + } + } + }, + Stanza::Error(error) => { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::Stream(error))) + .await; + // TODO: reconnect + } + Stanza::OtherContent(content) => { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::UnrecognizedContent)); + // TODO: send error to write_thread + } + } +} |