diff options
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r-- | luz/src/connection/read.rs | 200 |
1 files changed, 197 insertions, 3 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 8f8c4a0..46f1dc9 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, ops::{Deref, DerefMut}, + str::FromStr, sync::Arc, time::Duration, }; @@ -12,10 +13,14 @@ use tokio::{ task::{JoinHandle, JoinSet}, }; use tracing::info; +use uuid::Uuid; use crate::{ + chat::{Body, Message}, db::Db, - error::{Error, Reason}, + error::{Error, IqError, PresenceError, Reason, RecvMessageError}, + presence::{Offline, Online, Presence, Show}, + roster::Contact, UpdateMessage, }; @@ -116,7 +121,7 @@ impl Read { println!("read stanza"); match s { Ok(s) => { - self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone())); + self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone())); }, Err(e) => { println!("error: {:?}", e); @@ -173,8 +178,197 @@ async fn handle_stanza( db: Db, supervisor_control: mpsc::Sender<SupervisorCommand>, write_handle: WriteHandle, + pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>, ) { - println!("{:?}", stanza) + match stanza { + Stanza::Message(stanza_message) => { + if let Some(from) = stanza_message.from { + // TODO: group chat messages + let message = Message { + id: stanza_message + .id + // TODO: proper id storage + .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) + .unwrap_or_else(|| Uuid::new_v4()), + from: from.clone(), + body: Body { + // TODO: should this be an option? + body: stanza_message + .body + .map(|body| body.body) + .unwrap_or_default() + .unwrap_or_default(), + }, + }; + // TODO: can this be more efficient? + let result = db + .create_message_with_user_and_chat(message.clone(), from.clone()) + .await; + if let Err(e) = result { + let _ = update_sender + .send(UpdateMessage::Error(Error::CacheUpdate(e.into()))) + .await; + } + let _ = update_sender + .send(UpdateMessage::Message { to: from, message }) + .await; + } else { + let _ = update_sender + .send(UpdateMessage::Error(Error::RecvMessage( + RecvMessageError::MissingFrom, + ))) + .await; + } + } + Stanza::Presence(presence) => { + if let Some(from) = presence.from { + match presence.r#type { + Some(r#type) => match r#type { + // error processing a presence from somebody + stanza::client::presence::PresenceType::Error => { + // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. + let _ = update_sender + .send(UpdateMessage::Error(Error::Presence(PresenceError::Error( + Reason::Stanza(presence.errors.first().cloned()), + )))) + .await; + } + // should not happen (error to server) + stanza::client::presence::PresenceType::Probe => { + // TODO: should probably write an error and restart stream + let _ = update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::Unsupported, + ))) + .await; + } + stanza::client::presence::PresenceType::Subscribe => { + // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event + let _ = update_sender + .send(UpdateMessage::SubscriptionRequest(from)) + .await; + } + stanza::client::presence::PresenceType::Unavailable => { + let offline = Offline { + status: presence.status.map(|status| status.status.0), + }; + let _ = update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence::Offline(offline), + }) + .await; + } + // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. + stanza::client::presence::PresenceType::Subscribed => {} + stanza::client::presence::PresenceType::Unsubscribe => {} + stanza::client::presence::PresenceType::Unsubscribed => {} + }, + None => { + let online = Online { + show: presence.show.map(|show| match show { + stanza::client::presence::Show::Away => Show::Away, + stanza::client::presence::Show::Chat => Show::Chat, + stanza::client::presence::Show::Dnd => Show::DoNotDisturb, + stanza::client::presence::Show::Xa => Show::ExtendedAway, + }), + status: presence.status.map(|status| status.status.0), + priority: presence.priority.map(|priority| priority.0), + }; + let _ = update_sender + .send(UpdateMessage::Presence { + from, + presence: Presence::Online(online), + }) + .await; + } + } + } else { + let _ = update_sender + .send(UpdateMessage::Error(Error::Presence( + PresenceError::MissingFrom, + ))) + .await; + } + } + Stanza::Iq(iq) => match iq.r#type { + stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { + let send; + { + send = pending_iqs.lock().await.remove(&iq.id); + } + if let Some(send) = send { + send.send(Ok(Stanza::Iq(iq))); + } else { + let _ = update_sender + .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( + iq.id, + )))) + .await; + } + } + // TODO: send unsupported to server + // TODO: proper errors i am so tired please + stanza::client::iq::IqType::Get => {} + stanza::client::iq::IqType::Set => { + if let Some(query) = iq.query { + match query { + stanza::client::iq::Query::Roster(mut query) => { + // TODO: there should only be one + if let Some(item) = query.items.pop() { + match item.subscription { + Some(stanza::roster::Subscription::Remove) => { + db.delete_contact(item.jid.clone()).await; + update_sender + .send(UpdateMessage::RosterDelete(item.jid)) + .await; + // TODO: send result + } + _ => { + let contact: Contact = item.into(); + if let Err(e) = db.upsert_contact(contact.clone()).await { + let _ = update_sender + .send(UpdateMessage::Error(Error::CacheUpdate( + e.into(), + ))) + .await; + } + let _ = update_sender + .send(UpdateMessage::RosterUpdate(contact)) + .await; + // TODO: send result + // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { + // from: , + // id: todo!(), + // to: todo!(), + // r#type: todo!(), + // lang: todo!(), + // query: todo!(), + // errors: todo!(), + // })); + } + } + } + } + // TODO: send unsupported to server + _ => {} + } + } else { + // TODO: send error (unsupported) to server + } + } + }, + Stanza::Error(error) => { + let _ = update_sender + .send(UpdateMessage::Error(Error::Stream(error))) + .await; + // TODO: reconnect + } + Stanza::OtherContent(content) => { + let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content))); + // TODO: send error to write_thread + } + } } pub enum ReadControl { |